You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/02/05 01:15:02 UTC

svn commit: r1564574 - in /hive/branches/tez: ./ itests/qtest/ shims/0.20/src/main/java/org/apache/hadoop/hive/shims/ shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/ shims/common/src/main/j...

Author: gunther
Date: Wed Feb  5 00:15:02 2014
New Revision: 1564574

URL: http://svn.apache.org/r1564574
Log:
HIVE-6346: Add Hadoop-2.4.0 shims to hive-tez (Gopal V via Gunther Hagleitner)

Added:
    hive/branches/tez/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/ZeroCopyShims.java
Modified:
    hive/branches/tez/itests/qtest/pom.xml
    hive/branches/tez/pom.xml
    hive/branches/tez/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
    hive/branches/tez/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
    hive/branches/tez/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
    hive/branches/tez/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java

Modified: hive/branches/tez/itests/qtest/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/tez/itests/qtest/pom.xml?rev=1564574&r1=1564573&r2=1564574&view=diff
==============================================================================
--- hive/branches/tez/itests/qtest/pom.xml (original)
+++ hive/branches/tez/itests/qtest/pom.xml Wed Feb  5 00:15:02 2014
@@ -273,6 +273,12 @@
           <classifier>tests</classifier>
         </dependency>
         <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-client</artifactId>
+          <version>${hadoop-23.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
           <groupId>org.apache.hbase</groupId>
           <artifactId>hbase-common</artifactId>
           <version>${hbase.hadoop2.version}</version>

Modified: hive/branches/tez/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/tez/pom.xml?rev=1564574&r1=1564573&r2=1564574&view=diff
==============================================================================
--- hive/branches/tez/pom.xml (original)
+++ hive/branches/tez/pom.xml Wed Feb  5 00:15:02 2014
@@ -98,7 +98,7 @@
     <groovy.version>2.1.6</groovy.version>
     <hadoop-20.version>0.20.2</hadoop-20.version>
     <hadoop-20S.version>1.2.1</hadoop-20S.version>
-    <hadoop-23.version>2.2.0</hadoop-23.version>
+    <hadoop-23.version>2.3.0-SNAPSHOT</hadoop-23.version>
     <hbase.hadoop1.version>0.96.0-hadoop1</hbase.hadoop1.version>
     <hbase.hadoop2.version>0.96.0-hadoop2</hbase.hadoop2.version>
     <!-- httpcomponents are not always in version sync -->

Modified: hive/branches/tez/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=1564574&r1=1564573&r2=1564574&view=diff
==============================================================================
--- hive/branches/tez/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (original)
+++ hive/branches/tez/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java Wed Feb  5 00:15:02 2014
@@ -42,6 +42,7 @@ import javax.security.auth.login.LoginEx
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -50,8 +51,10 @@ import org.apache.hadoop.fs.ProxyFileSys
 import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputFormat;
@@ -773,4 +776,16 @@ public class Hadoop20Shims implements Ha
     ret.put("MAPREDTASKCLEANUPNEEDED", "mapreduce.job.committer.task.cleanup.needed");
     return ret;
   }
+
+  @Override
+  public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException {
+    /* not supported */
+    return null;
+  }
+
+  @Override
+  public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec) {
+    /* not supported */
+    return null;
+  }
 }

Modified: hive/branches/tez/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1564574&r1=1564573&r2=1564574&view=diff
==============================================================================
--- hive/branches/tez/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original)
+++ hive/branches/tez/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Wed Feb  5 00:15:02 2014
@@ -30,6 +30,7 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -37,7 +38,9 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.ProxyFileSystem;
 import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.mapred.JobTracker;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapred.ClusterStatus;
@@ -410,4 +413,16 @@ public class Hadoop20SShims extends Hado
     ret.put("MAPREDTASKCLEANUPNEEDED", "mapreduce.job.committer.task.cleanup.needed");
     return ret;
   }
+
+  @Override
+  public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException {
+    /* not supported */
+    return null;
+  }
+
+  @Override
+  public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec) {
+    /* not supported */
+    return null;
+  }
 }

Modified: hive/branches/tez/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1564574&r1=1564573&r2=1564574&view=diff
==============================================================================
--- hive/branches/tez/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original)
+++ hive/branches/tez/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Wed Feb  5 00:15:02 2014
@@ -27,11 +27,13 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.HashMap;
 import java.net.URI;
+import java.nio.ByteBuffer;
 import java.io.FileNotFoundException;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -41,7 +43,12 @@ import org.apache.hadoop.fs.ProxyFileSys
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.shims.HadoopShims.ByteBufferPoolShim;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim;
+import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
@@ -71,6 +78,19 @@ public class Hadoop23Shims extends Hadoo
 
   HadoopShims.MiniDFSShim cluster = null;
 
+  final boolean zeroCopy;
+
+  public Hadoop23Shims() {
+    boolean zcr = false;
+    try {
+      Class.forName("org.apache.hadoop.fs.CacheFlag", false,
+          ShimLoader.class.getClassLoader());
+      zcr = true;
+    } catch (ClassNotFoundException ce) {
+    }
+    this.zeroCopy = zcr;
+  }
+
   @Override
   public String getTaskAttemptLogUrl(JobConf conf,
     String taskTrackerHttpAddress, String taskAttemptId)
@@ -556,5 +576,23 @@ public class Hadoop23Shims extends Hadoo
     ret.put("MAPREDSETUPCLEANUPNEEDED", "mapreduce.job.committer.setup.cleanup.needed");
     ret.put("MAPREDTASKCLEANUPNEEDED", "mapreduce.job.committer.task.cleanup.needed");
     return ret;
+ }
+
+  @Override
+  public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException {
+    if(zeroCopy) {
+      return ZeroCopyShims.getZeroCopyReader(in, pool);
+    }
+    /* not supported */
+    return null;
+  }
+
+  @Override
+  public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec) {
+    if(zeroCopy) {
+      return ZeroCopyShims.getDirectDecompressor(codec);
+    }
+    /* not supported */
+    return null;
   }
 }

Added: hive/branches/tez/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/ZeroCopyShims.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/ZeroCopyShims.java?rev=1564574&view=auto
==============================================================================
--- hive/branches/tez/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/ZeroCopyShims.java (added)
+++ hive/branches/tez/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/ZeroCopyShims.java Wed Feb  5 00:15:02 2014
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.shims;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.compress.DirectDecompressor;
+import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor;
+import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.CompressionHeader;
+import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor;
+
+import org.apache.hadoop.hive.shims.HadoopShims.ByteBufferPoolShim;
+import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
+
+class ZeroCopyShims {
+  private static final class ByteBufferPoolAdapter implements ByteBufferPool {
+    private ByteBufferPoolShim pool;
+
+    public ByteBufferPoolAdapter(ByteBufferPoolShim pool) {
+      this.pool = pool;
+    }
+
+    @Override
+    public final ByteBuffer getBuffer(boolean direct, int length) {
+      return this.pool.getBuffer(direct, length);
+    }
+
+    @Override
+    public final void putBuffer(ByteBuffer buffer) {
+      this.pool.putBuffer(buffer);
+    }
+  }
+
+  private static final class ZeroCopyAdapter implements ZeroCopyReaderShim {
+    private final FSDataInputStream in;
+    private final ByteBufferPoolAdapter pool;
+    private final static EnumSet<ReadOption> CHECK_SUM = EnumSet
+        .noneOf(ReadOption.class);
+    private final static EnumSet<ReadOption> NO_CHECK_SUM = EnumSet
+        .of(ReadOption.SKIP_CHECKSUMS);
+
+    public ZeroCopyAdapter(FSDataInputStream in, ByteBufferPoolShim poolshim) {
+      this.in = in;
+      if (poolshim != null) {
+        pool = new ByteBufferPoolAdapter(poolshim);
+      } else {
+        pool = null;
+      }
+    }
+
+    public final ByteBuffer readBuffer(int maxLength, boolean verifyChecksums)
+        throws IOException {
+      EnumSet<ReadOption> options = NO_CHECK_SUM;
+      if (verifyChecksums) {
+        options = CHECK_SUM;
+      }
+      return this.in.read(this.pool, maxLength, options);
+    }
+
+    public final void releaseBuffer(ByteBuffer buffer) {
+      this.in.releaseBuffer(buffer);
+    }
+  }
+
+  public static ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in,
+      ByteBufferPoolShim pool) throws IOException {
+    return new ZeroCopyAdapter(in, pool);
+  }
+
+  private static final class DirectDecompressorAdapter implements
+      DirectDecompressorShim {
+    private final DirectDecompressor decompressor;
+
+    public DirectDecompressorAdapter(DirectDecompressor decompressor) {
+      this.decompressor = decompressor;
+    }
+
+    public void decompress(ByteBuffer src, ByteBuffer dst) throws IOException {
+      this.decompressor.decompress(src, dst);
+    }
+  }
+
+  public static DirectDecompressorShim getDirectDecompressor(
+      DirectCompressionType codec) {
+    DirectDecompressor decompressor = null;
+    switch (codec) {
+    case ZLIB: {
+      decompressor = new ZlibDirectDecompressor();
+    }
+      break;
+    case ZLIB_NOHEADER: {
+      decompressor = new ZlibDirectDecompressor(CompressionHeader.NO_HEADER, 0);
+    }
+      break;
+    case SNAPPY: {
+      decompressor = new SnappyDirectDecompressor();
+    }
+      break;
+    }
+    if (decompressor != null) {
+      return new DirectDecompressorAdapter(decompressor);
+    }
+    /* not supported */
+    return null;
+  }
+}

Modified: hive/branches/tez/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1564574&r1=1564573&r2=1564574&view=diff
==============================================================================
--- hive/branches/tez/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/branches/tez/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Wed Feb  5 00:15:02 2014
@@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.Comparator;
 import java.util.Iterator;
@@ -36,11 +37,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -520,4 +523,68 @@ public interface HadoopShims {
   public FileSystem createProxyFileSystem(FileSystem fs, URI uri);
 
   public Map<String, String> getHadoopConfNames();
+
+  /**
+   * a hadoop.io ByteBufferPool shim.
+   */
+  public interface ByteBufferPoolShim {
+    /**
+     * Get a new ByteBuffer from the pool.  The pool can provide this from
+     * removing a buffer from its internal cache, or by allocating a
+     * new buffer.
+     *
+     * @param direct     Whether the buffer should be direct.
+     * @param length     The minimum length the buffer will have.
+     * @return           A new ByteBuffer. Its capacity can be less
+     *                   than what was requested, but must be at
+     *                   least 1 byte.
+     */
+    ByteBuffer getBuffer(boolean direct, int length);
+
+    /**
+     * Release a buffer back to the pool.
+     * The pool may choose to put this buffer into its cache/free it.
+     *
+     * @param buffer    a direct bytebuffer
+     */
+    void putBuffer(ByteBuffer buffer);
+  }
+
+  /**
+   * Provides an HDFS ZeroCopyReader shim.
+   * @param in FSDataInputStream to read from (where the cached/mmap buffers are tied to)
+   * @param in ByteBufferPoolShim to allocate fallback buffers with
+   *
+   * @return returns null if not supported
+   */
+  public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException;
+
+  public interface ZeroCopyReaderShim {
+    /**
+     * Get a ByteBuffer from the FSDataInputStream - this can be either a HeapByteBuffer or an MappedByteBuffer.
+     * Also move the in stream by that amount. The data read can be small than maxLength.
+     *
+     * @return ByteBuffer read from the stream,
+     */
+    public ByteBuffer readBuffer(int maxLength, boolean verifyChecksums) throws IOException;
+    /**
+     * Release a ByteBuffer obtained from a read on the
+     * Also move the in stream by that amount. The data read can be small than maxLength.
+     *
+     */
+    public void releaseBuffer(ByteBuffer buffer);
+  }
+
+  public enum DirectCompressionType {
+    NONE,
+    ZLIB_NOHEADER,
+    ZLIB,
+    SNAPPY,
+  };
+
+  public interface DirectDecompressorShim {
+    public void decompress(ByteBuffer src, ByteBuffer dst) throws IOException;
+  }
+
+  public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec);
 }