You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/02/05 01:15:02 UTC
svn commit: r1564574 - in /hive/branches/tez: ./ itests/qtest/
shims/0.20/src/main/java/org/apache/hadoop/hive/shims/
shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/
shims/0.23/src/main/java/org/apache/hadoop/hive/shims/
shims/common/src/main/j...
Author: gunther
Date: Wed Feb 5 00:15:02 2014
New Revision: 1564574
URL: http://svn.apache.org/r1564574
Log:
HIVE-6346: Add Hadoop-2.4.0 shims to hive-tez (Gopal V via Gunther Hagleitner)
Added:
hive/branches/tez/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/ZeroCopyShims.java
Modified:
hive/branches/tez/itests/qtest/pom.xml
hive/branches/tez/pom.xml
hive/branches/tez/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
hive/branches/tez/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
hive/branches/tez/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
hive/branches/tez/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
Modified: hive/branches/tez/itests/qtest/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/tez/itests/qtest/pom.xml?rev=1564574&r1=1564573&r2=1564574&view=diff
==============================================================================
--- hive/branches/tez/itests/qtest/pom.xml (original)
+++ hive/branches/tez/itests/qtest/pom.xml Wed Feb 5 00:15:02 2014
@@ -273,6 +273,12 @@
<classifier>tests</classifier>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <version>${hadoop-23.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.hadoop2.version}</version>
Modified: hive/branches/tez/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/tez/pom.xml?rev=1564574&r1=1564573&r2=1564574&view=diff
==============================================================================
--- hive/branches/tez/pom.xml (original)
+++ hive/branches/tez/pom.xml Wed Feb 5 00:15:02 2014
@@ -98,7 +98,7 @@
<groovy.version>2.1.6</groovy.version>
<hadoop-20.version>0.20.2</hadoop-20.version>
<hadoop-20S.version>1.2.1</hadoop-20S.version>
- <hadoop-23.version>2.2.0</hadoop-23.version>
+ <hadoop-23.version>2.3.0-SNAPSHOT</hadoop-23.version>
<hbase.hadoop1.version>0.96.0-hadoop1</hbase.hadoop1.version>
<hbase.hadoop2.version>0.96.0-hadoop2</hbase.hadoop2.version>
<!-- httpcomponents are not always in version sync -->
Modified: hive/branches/tez/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=1564574&r1=1564573&r2=1564574&view=diff
==============================================================================
--- hive/branches/tez/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (original)
+++ hive/branches/tez/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java Wed Feb 5 00:15:02 2014
@@ -42,6 +42,7 @@ import javax.security.auth.login.LoginEx
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -50,8 +51,10 @@ import org.apache.hadoop.fs.ProxyFileSys
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputFormat;
@@ -773,4 +776,16 @@ public class Hadoop20Shims implements Ha
ret.put("MAPREDTASKCLEANUPNEEDED", "mapreduce.job.committer.task.cleanup.needed");
return ret;
}
+
+ @Override
+ public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException {
+ /* not supported */
+ return null;
+ }
+
+ @Override
+ public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec) {
+ /* not supported */
+ return null;
+ }
}
Modified: hive/branches/tez/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1564574&r1=1564573&r2=1564574&view=diff
==============================================================================
--- hive/branches/tez/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original)
+++ hive/branches/tez/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Wed Feb 5 00:15:02 2014
@@ -30,6 +30,7 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -37,7 +38,9 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.ProxyFileSystem;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.ClusterStatus;
@@ -410,4 +413,16 @@ public class Hadoop20SShims extends Hado
ret.put("MAPREDTASKCLEANUPNEEDED", "mapreduce.job.committer.task.cleanup.needed");
return ret;
}
+
+ @Override
+ public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException {
+ /* not supported */
+ return null;
+ }
+
+ @Override
+ public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec) {
+ /* not supported */
+ return null;
+ }
}
Modified: hive/branches/tez/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1564574&r1=1564573&r2=1564574&view=diff
==============================================================================
--- hive/branches/tez/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original)
+++ hive/branches/tez/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Wed Feb 5 00:15:02 2014
@@ -27,11 +27,13 @@ import java.util.Iterator;
import java.util.Map;
import java.util.HashMap;
import java.net.URI;
+import java.nio.ByteBuffer;
import java.io.FileNotFoundException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -41,7 +43,12 @@ import org.apache.hadoop.fs.ProxyFileSys
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.shims.HadoopShims.ByteBufferPoolShim;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim;
+import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
@@ -71,6 +78,19 @@ public class Hadoop23Shims extends Hadoo
HadoopShims.MiniDFSShim cluster = null;
+ final boolean zeroCopy;
+
+ public Hadoop23Shims() {
+ boolean zcr = false;
+ try {
+ Class.forName("org.apache.hadoop.fs.CacheFlag", false,
+ ShimLoader.class.getClassLoader());
+ zcr = true;
+ } catch (ClassNotFoundException ce) {
+ }
+ this.zeroCopy = zcr;
+ }
+
@Override
public String getTaskAttemptLogUrl(JobConf conf,
String taskTrackerHttpAddress, String taskAttemptId)
@@ -556,5 +576,23 @@ public class Hadoop23Shims extends Hadoo
ret.put("MAPREDSETUPCLEANUPNEEDED", "mapreduce.job.committer.setup.cleanup.needed");
ret.put("MAPREDTASKCLEANUPNEEDED", "mapreduce.job.committer.task.cleanup.needed");
return ret;
+ }
+
+ @Override
+ public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException {
+ if(zeroCopy) {
+ return ZeroCopyShims.getZeroCopyReader(in, pool);
+ }
+ /* not supported */
+ return null;
+ }
+
+ @Override
+ public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec) {
+ if(zeroCopy) {
+ return ZeroCopyShims.getDirectDecompressor(codec);
+ }
+ /* not supported */
+ return null;
}
}
Added: hive/branches/tez/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/ZeroCopyShims.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/ZeroCopyShims.java?rev=1564574&view=auto
==============================================================================
--- hive/branches/tez/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/ZeroCopyShims.java (added)
+++ hive/branches/tez/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/ZeroCopyShims.java Wed Feb 5 00:15:02 2014
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.shims;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.compress.DirectDecompressor;
+import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor;
+import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.CompressionHeader;
+import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor;
+
+import org.apache.hadoop.hive.shims.HadoopShims.ByteBufferPoolShim;
+import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
+
+class ZeroCopyShims {
+ private static final class ByteBufferPoolAdapter implements ByteBufferPool {
+ private ByteBufferPoolShim pool;
+
+ public ByteBufferPoolAdapter(ByteBufferPoolShim pool) {
+ this.pool = pool;
+ }
+
+ @Override
+ public final ByteBuffer getBuffer(boolean direct, int length) {
+ return this.pool.getBuffer(direct, length);
+ }
+
+ @Override
+ public final void putBuffer(ByteBuffer buffer) {
+ this.pool.putBuffer(buffer);
+ }
+ }
+
+ private static final class ZeroCopyAdapter implements ZeroCopyReaderShim {
+ private final FSDataInputStream in;
+ private final ByteBufferPoolAdapter pool;
+ private final static EnumSet<ReadOption> CHECK_SUM = EnumSet
+ .noneOf(ReadOption.class);
+ private final static EnumSet<ReadOption> NO_CHECK_SUM = EnumSet
+ .of(ReadOption.SKIP_CHECKSUMS);
+
+ public ZeroCopyAdapter(FSDataInputStream in, ByteBufferPoolShim poolshim) {
+ this.in = in;
+ if (poolshim != null) {
+ pool = new ByteBufferPoolAdapter(poolshim);
+ } else {
+ pool = null;
+ }
+ }
+
+ public final ByteBuffer readBuffer(int maxLength, boolean verifyChecksums)
+ throws IOException {
+ EnumSet<ReadOption> options = NO_CHECK_SUM;
+ if (verifyChecksums) {
+ options = CHECK_SUM;
+ }
+ return this.in.read(this.pool, maxLength, options);
+ }
+
+ public final void releaseBuffer(ByteBuffer buffer) {
+ this.in.releaseBuffer(buffer);
+ }
+ }
+
+ public static ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in,
+ ByteBufferPoolShim pool) throws IOException {
+ return new ZeroCopyAdapter(in, pool);
+ }
+
+ private static final class DirectDecompressorAdapter implements
+ DirectDecompressorShim {
+ private final DirectDecompressor decompressor;
+
+ public DirectDecompressorAdapter(DirectDecompressor decompressor) {
+ this.decompressor = decompressor;
+ }
+
+ public void decompress(ByteBuffer src, ByteBuffer dst) throws IOException {
+ this.decompressor.decompress(src, dst);
+ }
+ }
+
+ public static DirectDecompressorShim getDirectDecompressor(
+ DirectCompressionType codec) {
+ DirectDecompressor decompressor = null;
+ switch (codec) {
+ case ZLIB: {
+ decompressor = new ZlibDirectDecompressor();
+ }
+ break;
+ case ZLIB_NOHEADER: {
+ decompressor = new ZlibDirectDecompressor(CompressionHeader.NO_HEADER, 0);
+ }
+ break;
+ case SNAPPY: {
+ decompressor = new SnappyDirectDecompressor();
+ }
+ break;
+ }
+ if (decompressor != null) {
+ return new DirectDecompressorAdapter(decompressor);
+ }
+ /* not supported */
+ return null;
+ }
+}
Modified: hive/branches/tez/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1564574&r1=1564573&r2=1564574&view=diff
==============================================================================
--- hive/branches/tez/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/branches/tez/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Wed Feb 5 00:15:02 2014
@@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.Comparator;
import java.util.Iterator;
@@ -36,11 +37,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -520,4 +523,68 @@ public interface HadoopShims {
public FileSystem createProxyFileSystem(FileSystem fs, URI uri);
public Map<String, String> getHadoopConfNames();
+
+ /**
+ * a hadoop.io ByteBufferPool shim.
+ */
+ public interface ByteBufferPoolShim {
+ /**
+ * Get a new ByteBuffer from the pool. The pool can provide this from
+ * removing a buffer from its internal cache, or by allocating a
+ * new buffer.
+ *
+ * @param direct Whether the buffer should be direct.
+ * @param length The minimum length the buffer will have.
+ * @return A new ByteBuffer. Its capacity can be less
+ * than what was requested, but must be at
+ * least 1 byte.
+ */
+ ByteBuffer getBuffer(boolean direct, int length);
+
+ /**
+ * Release a buffer back to the pool.
+ * The pool may choose to put this buffer into its cache/free it.
+ *
+ * @param buffer a direct bytebuffer
+ */
+ void putBuffer(ByteBuffer buffer);
+ }
+
+ /**
+ * Provides an HDFS ZeroCopyReader shim.
+ * @param in FSDataInputStream to read from (where the cached/mmap buffers are tied to)
+ * @param in ByteBufferPoolShim to allocate fallback buffers with
+ *
+ * @return returns null if not supported
+ */
+ public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException;
+
+ public interface ZeroCopyReaderShim {
+ /**
+ * Get a ByteBuffer from the FSDataInputStream - this can be either a HeapByteBuffer or an MappedByteBuffer.
+ * Also move the in stream by that amount. The data read can be small than maxLength.
+ *
+ * @return ByteBuffer read from the stream,
+ */
+ public ByteBuffer readBuffer(int maxLength, boolean verifyChecksums) throws IOException;
+ /**
+ * Release a ByteBuffer obtained from a read on the
+ * Also move the in stream by that amount. The data read can be small than maxLength.
+ *
+ */
+ public void releaseBuffer(ByteBuffer buffer);
+ }
+
+ public enum DirectCompressionType {
+ NONE,
+ ZLIB_NOHEADER,
+ ZLIB,
+ SNAPPY,
+ };
+
+ public interface DirectDecompressorShim {
+ public void decompress(ByteBuffer src, ByteBuffer dst) throws IOException;
+ }
+
+ public DirectDecompressorShim getDirectDecompressor(DirectCompressionType codec);
}