You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/02/26 09:50:23 UTC

svn commit: r1571964 - in /hive/branches/tez: common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ ql/src/test/org/apache/hadoop/hive/ql/io/orc/

Author: gunther
Date: Wed Feb 26 08:50:22 2014
New Revision: 1571964

URL: http://svn.apache.org/r1571964
Log:
HIVE-6347: ZeroCopy read path for ORC RecordReader (Gopal V, reviewed by Owen)

Added:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DirectDecompressionCodec.java
Modified:
    hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/branches/tez/conf/hive-default.xml.template
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java
    hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
    hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestStringRedBlackTree.java

Modified: hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1571964&r1=1571963&r2=1571964&view=diff
==============================================================================
--- hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Feb 26 08:50:22 2014
@@ -536,6 +536,8 @@ public class HiveConf extends Configurat
     HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS("hive.orc.compute.splits.num.threads", 10),
     HIVE_ORC_SKIP_CORRUPT_DATA("hive.exec.orc.skip.corrupt.data", false),
 
+    HIVE_ORC_ZEROCOPY("hive.exec.orc.zerocopy", false),
+
     HIVESKEWJOIN("hive.optimize.skewjoin", false),
     HIVECONVERTJOIN("hive.auto.convert.join", true),
     HIVECONVERTJOINNOCONDITIONALTASK("hive.auto.convert.join.noconditionaltask", true),

Modified: hive/branches/tez/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/branches/tez/conf/hive-default.xml.template?rev=1571964&r1=1571963&r2=1571964&view=diff
==============================================================================
--- hive/branches/tez/conf/hive-default.xml.template (original)
+++ hive/branches/tez/conf/hive-default.xml.template Wed Feb 26 08:50:22 2014
@@ -2368,6 +2368,14 @@
 </property>
 
 <property>
+  <name>hive.exec.orc.zerocopy</name>.
+  <value>false</value>
+  <description>
+    Use zerocopy reads with ORC.
+  </description>
+</property>
+
+<property>
   <name>hive.jar.directory</name>
   <value>hdfs:///user/hive/</value>
   <description>

Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DirectDecompressionCodec.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DirectDecompressionCodec.java?rev=1571964&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DirectDecompressionCodec.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DirectDecompressionCodec.java Wed Feb 26 08:50:22 2014
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public interface DirectDecompressionCodec extends CompressionCodec {
+  public boolean isAvailable();
+  public void directDecompress(ByteBuffer in, ByteBuffer out) throws IOException;
+}

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1571964&r1=1571963&r2=1571964&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Wed Feb 26 08:50:22 2014
@@ -21,8 +21,13 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 abstract class InStream extends InputStream {
 
+  private static final Log LOG = LogFactory.getLog(InStream.class);
+
   private static class UncompressedStream extends InStream {
     private final String name;
     private final ByteBuffer[] bytes;
@@ -172,7 +177,7 @@ abstract class InStream extends InputStr
               bufferSize + " needed = " + chunkLength);
         }
         // read 3 bytes, which should be equal to OutStream.HEADER_SIZE always
-		assert OutStream.HEADER_SIZE == 3 : "The Orc HEADER_SIZE must be the same in OutStream and InStream";
+        assert OutStream.HEADER_SIZE == 3 : "The Orc HEADER_SIZE must be the same in OutStream and InStream";
         currentOffset += OutStream.HEADER_SIZE;
 
         ByteBuffer slice = this.slice(chunkLength);
@@ -274,14 +279,23 @@ abstract class InStream extends InputStr
             chunkLength + " bytes");
       }
 
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format(
+            "Crossing into next BufferChunk because compressed only has %d bytes (needs %d)",
+            compressed.remaining(), len));
+      }
+
       // we need to consolidate 2 or more buffers into 1
-      // first clear out compressed buffers
+      // first copy out compressed buffers
       ByteBuffer copy = allocateBuffer(chunkLength);
       currentOffset += compressed.remaining();
       len -= compressed.remaining();
       copy.put(compressed);
 
       while (len > 0 && (++currentRange) < bytes.length) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString()));
+        }
         compressed = bytes[currentRange].duplicate();
         if (compressed.remaining() >= len) {
           slice = compressed.slice();

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java?rev=1571964&r1=1571963&r2=1571964&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java Wed Feb 26 08:50:22 2014
@@ -122,6 +122,9 @@ class MemoryManager {
       totalAllocation -= val.allocation;
       updateScale(false);
     }
+    if(writerList.isEmpty()) {
+      rowsAddedSinceCheck = 0;
+    }
   }
 
   /**

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1571964&r1=1571963&r2=1571964&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Wed Feb 26 08:50:22 2014
@@ -28,7 +28,9 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 
+import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -36,6 +38,8 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.conf.HiveConf;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_ZEROCOPY;
 import org.apache.hadoop.hive.ql.exec.vector.*;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -53,6 +57,10 @@ import org.apache.hadoop.io.FloatWritabl
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.HadoopShims.*;
+
+import com.google.common.collect.ComparisonChain;
 
 class RecordReaderImpl implements RecordReader {
 
@@ -87,6 +95,89 @@ class RecordReaderImpl implements Record
   private boolean[] includedRowGroups = null;
   private final Configuration conf;
 
+  private final ByteBufferAllocatorPool pool = new ByteBufferAllocatorPool();
+  private final ZeroCopyReaderShim zcr;
+
+  // this is an implementation copied from ElasticByteBufferPool in hadoop-2,
+  // which lacks a clear()/clean() operation
+  public final static class ByteBufferAllocatorPool implements ByteBufferPoolShim {
+    private static final class Key implements Comparable<Key> {
+      private final int capacity;
+      private final long insertionGeneration;
+
+      Key(int capacity, long insertionGeneration) {
+        this.capacity = capacity;
+        this.insertionGeneration = insertionGeneration;
+      }
+
+      @Override
+      public int compareTo(Key other) {
+        return ComparisonChain.start().compare(capacity, other.capacity)
+            .compare(insertionGeneration, other.insertionGeneration).result();
+      }
+
+      @Override
+      public boolean equals(Object rhs) {
+        if (rhs == null) {
+          return false;
+        }
+        try {
+          Key o = (Key) rhs;
+          return (compareTo(o) == 0);
+        } catch (ClassCastException e) {
+          return false;
+        }
+      }
+
+      @Override
+      public int hashCode() {
+        return new HashCodeBuilder().append(capacity).append(insertionGeneration)
+            .toHashCode();
+      }
+    }
+
+    private final TreeMap<Key, ByteBuffer> buffers = new TreeMap<Key, ByteBuffer>();
+
+    private final TreeMap<Key, ByteBuffer> directBuffers = new TreeMap<Key, ByteBuffer>();
+
+    private long currentGeneration = 0;
+
+    private final TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) {
+      return direct ? directBuffers : buffers;
+    }
+
+    public void clear() {
+      buffers.clear();
+      directBuffers.clear();
+    }
+
+    @Override
+    public ByteBuffer getBuffer(boolean direct, int length) {
+      TreeMap<Key, ByteBuffer> tree = getBufferTree(direct);
+      Map.Entry<Key, ByteBuffer> entry = tree.ceilingEntry(new Key(length, 0));
+      if (entry == null) {
+        return direct ? ByteBuffer.allocateDirect(length) : ByteBuffer
+            .allocate(length);
+      }
+      tree.remove(entry.getKey());
+      return entry.getValue();
+    }
+
+    @Override
+    public void putBuffer(ByteBuffer buffer) {
+      TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect());
+      while (true) {
+        Key key = new Key(buffer.capacity(), currentGeneration++);
+        if (!tree.containsKey(key)) {
+          tree.put(key, buffer);
+          return;
+        }
+        // Buffers are indexed by (capacity, generation).
+        // If our key is not unique on the first try, we try again
+      }
+    }
+  }
+
   RecordReaderImpl(Iterable<StripeInformation> stripes,
                    FileSystem fileSystem,
                    Path path,
@@ -130,6 +221,18 @@ class RecordReaderImpl implements Record
       }
     }
 
+    final boolean zeroCopy = (conf != null)
+        && (HiveConf.getBoolVar(conf, HIVE_ORC_ZEROCOPY));
+
+    if (zeroCopy
+        && (codec == null || ((codec instanceof DirectDecompressionCodec)
+            && ((DirectDecompressionCodec) codec).isAvailable()))) {
+      /* codec is null or is available */
+      this.zcr = ShimLoader.getHadoopShims().getZeroCopyReader(file, pool);
+    } else {
+      this.zcr = null;
+    }
+
     firstRow = skippedRows;
     totalRowCount = rows;
     reader = createTreeReader(path, 0, types, included, conf);
@@ -2283,6 +2386,11 @@ class RecordReaderImpl implements Record
       is.close();
     }
     if(bufferChunks != null) {
+      if(zcr != null) {
+        for (BufferChunk bufChunk : bufferChunks) {
+          zcr.releaseBuffer(bufChunk.chunk);
+        }
+      }
       bufferChunks.clear();
     }
     streams.clear();
@@ -2599,10 +2707,20 @@ class RecordReaderImpl implements Record
     for(DiskRange range: ranges) {
       int len = (int) (range.end - range.offset);
       long off = range.offset;
-      file.seek(base + off); 
-      byte[] buffer = new byte[len];
-      file.readFully(buffer, 0, buffer.length);
-      result.add(new BufferChunk(ByteBuffer.wrap(buffer), range.offset));
+      file.seek(base + off);
+      if(zcr != null) {
+        while(len > 0) {
+          ByteBuffer partial = zcr.readBuffer(len, false);
+          result.add(new BufferChunk(partial, off));
+          int read = partial.remaining();
+          len -= read;
+          off += read;
+        }
+      } else {
+        byte[] buffer = new byte[len];
+        file.readFully(buffer, 0, buffer.length);
+        result.add(new BufferChunk(ByteBuffer.wrap(buffer), range.offset));
+      }
     }
     return result;
   }
@@ -2840,6 +2958,7 @@ class RecordReaderImpl implements Record
   @Override
   public void close() throws IOException {
     clearStreams();
+    pool.clear();
     file.close();
   }
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java?rev=1571964&r1=1571963&r2=1571964&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java Wed Feb 26 08:50:22 2014
@@ -18,12 +18,17 @@
 
 package org.apache.hadoop.hive.ql.io.orc;
 
+import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType;
 import org.iq80.snappy.Snappy;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-class SnappyCodec implements CompressionCodec {
+class SnappyCodec implements CompressionCodec, DirectDecompressionCodec {
+
+  Boolean direct = null;
 
   @Override
   public boolean compress(ByteBuffer in, ByteBuffer out,
@@ -57,6 +62,10 @@ class SnappyCodec implements Compression
 
   @Override
   public void decompress(ByteBuffer in, ByteBuffer out) throws IOException {
+    if(in.isDirect() && out.isDirect()) {
+      directDecompress(in, out);
+      return;
+    }
     int inOffset = in.position();
     int uncompressLen =
         Snappy.uncompress(in.array(), in.arrayOffset() + inOffset,
@@ -64,4 +73,30 @@ class SnappyCodec implements Compression
     out.position(uncompressLen + out.position());
     out.flip();
   }
+
+  @Override
+  public boolean isAvailable() {
+    if (direct == null) {
+      try {
+        if (ShimLoader.getHadoopShims().getDirectDecompressor(
+            DirectCompressionType.SNAPPY) != null) {
+          direct = Boolean.valueOf(true);
+        } else {
+          direct = Boolean.valueOf(false);
+        }
+      } catch (UnsatisfiedLinkError ule) {
+        direct = Boolean.valueOf(false);
+      }
+    }
+    return direct.booleanValue();
+  }
+
+  @Override
+  public void directDecompress(ByteBuffer in, ByteBuffer out)
+      throws IOException {
+    DirectDecompressorShim decompressShim = ShimLoader.getHadoopShims()
+        .getDirectDecompressor(DirectCompressionType.SNAPPY);
+    decompressShim.decompress(in, out);
+    out.flip(); // flip for read
+  }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java?rev=1571964&r1=1571963&r2=1571964&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java Wed Feb 26 08:50:22 2014
@@ -67,6 +67,7 @@ public class VectorizedOrcInputFormat ex
       this.offset = fileSplit.getStart();
       this.length = fileSplit.getLength();
       this.reader = file.rows(offset, length, includedColumns, sarg, columnNames);
+
       try {
         rbCtx = new VectorizedRowBatchCtx();
         rbCtx.init(conf, fileSplit);

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java?rev=1571964&r1=1571963&r2=1571964&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java Wed Feb 26 08:50:22 2014
@@ -23,7 +23,14 @@ import java.util.zip.DataFormatException
 import java.util.zip.Deflater;
 import java.util.zip.Inflater;
 
-class ZlibCodec implements CompressionCodec {
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType;
+import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim;
+import org.apache.hadoop.hive.shims.ShimLoader;
+
+class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
+
+  private Boolean direct = null;
 
   @Override
   public boolean compress(ByteBuffer in, ByteBuffer out,
@@ -55,6 +62,12 @@ class ZlibCodec implements CompressionCo
 
   @Override
   public void decompress(ByteBuffer in, ByteBuffer out) throws IOException {
+
+    if(in.isDirect() && out.isDirect()) {
+      directDecompress(in, out);
+      return;
+    }
+
     Inflater inflater = new Inflater(true);
     inflater.setInput(in.array(), in.arrayOffset() + in.position(),
                       in.remaining());
@@ -74,4 +87,30 @@ class ZlibCodec implements CompressionCo
     in.position(in.limit());
   }
 
+  @Override
+  public boolean isAvailable() {
+    if (direct == null) {
+      // see nowrap option in new Inflater(boolean) which disables zlib headers
+      try {
+        if (ShimLoader.getHadoopShims().getDirectDecompressor(
+            DirectCompressionType.ZLIB_NOHEADER) != null) {
+          direct = Boolean.valueOf(true);
+        } else {
+          direct = Boolean.valueOf(false);
+        }
+      } catch (UnsatisfiedLinkError ule) {
+        direct = Boolean.valueOf(false);
+      }
+    }
+    return direct.booleanValue();
+  }
+
+  @Override
+  public void directDecompress(ByteBuffer in, ByteBuffer out)
+      throws IOException {
+    DirectDecompressorShim decompressShim = ShimLoader.getHadoopShims()
+        .getDirectDecompressor(DirectCompressionType.ZLIB_NOHEADER);
+    decompressShim.decompress(in, out);
+    out.flip(); // flip for read
+  }
 }

Modified: hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java?rev=1571964&r1=1571963&r2=1571964&view=diff
==============================================================================
--- hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java (original)
+++ hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java Wed Feb 26 08:50:22 2014
@@ -29,6 +29,8 @@ import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -39,6 +41,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_ZEROCOPY;
+
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -67,14 +72,19 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hive.common.util.HiveTestUtils;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 /**
  * Tests for the top level reader/streamFactory of ORC files.
  */
+@RunWith(value = Parameterized.class)
 public class TestOrcFile {
 
   public static class SimpleStruct {
@@ -191,6 +201,16 @@ public class TestOrcFile {
   Configuration conf;
   FileSystem fs;
   Path testFilePath;
+  private final boolean zeroCopy;
+
+  @Parameters
+  public static Collection<Boolean[]> data() {
+    return Arrays.asList(new Boolean[][] { {false}, {true}});
+  }
+
+  public TestOrcFile(Boolean zcr) {
+    zeroCopy = zcr.booleanValue();
+  }
 
   @Rule
   public TestName testCaseName = new TestName();
@@ -198,6 +218,9 @@ public class TestOrcFile {
   @Before
   public void openFileSystem () throws Exception {
     conf = new Configuration();
+    if(zeroCopy) {
+      conf.setBoolean(HIVE_ORC_ZEROCOPY.varname, zeroCopy);
+    }
     fs = FileSystem.getLocal(conf);
     testFilePath = new Path(workDir, "TestOrcFile." +
         testCaseName.getMethodName() + ".orc");
@@ -547,6 +570,7 @@ public class TestOrcFile {
       inspector = ObjectInspectorFactory.getReflectionObjectInspector
           (InnerStruct.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
     }
+
     Writer writer = OrcFile.createWriter(testFilePath,
         OrcFile.writerOptions(conf)
             .inspector(inspector)
@@ -572,35 +596,36 @@ public class TestOrcFile {
     StripeStatistics ss1 = metadata.getStripeStatistics().get(0);
     StripeStatistics ss2 = metadata.getStripeStatistics().get(1);
     StripeStatistics ss3 = metadata.getStripeStatistics().get(2);
-    assertEquals(4996, ss1.getColumnStatistics()[0].getNumberOfValues());
+
+    assertEquals(5000, ss1.getColumnStatistics()[0].getNumberOfValues());
     assertEquals(5000, ss2.getColumnStatistics()[0].getNumberOfValues());
-    assertEquals(1004, ss3.getColumnStatistics()[0].getNumberOfValues());
+    assertEquals(1000, ss3.getColumnStatistics()[0].getNumberOfValues());
 
-    assertEquals(4996, ((IntegerColumnStatistics)ss1.getColumnStatistics()[1]).getNumberOfValues());
+    assertEquals(5000, ((IntegerColumnStatistics)ss1.getColumnStatistics()[1]).getNumberOfValues());
     assertEquals(5000, ((IntegerColumnStatistics)ss2.getColumnStatistics()[1]).getNumberOfValues());
-    assertEquals(1004, ((IntegerColumnStatistics)ss3.getColumnStatistics()[1]).getNumberOfValues());
+    assertEquals(1000, ((IntegerColumnStatistics)ss3.getColumnStatistics()[1]).getNumberOfValues());
     assertEquals(1, ((IntegerColumnStatistics)ss1.getColumnStatistics()[1]).getMinimum());
-    assertEquals(1, ((IntegerColumnStatistics)ss2.getColumnStatistics()[1]).getMinimum());
-    assertEquals(2, ((IntegerColumnStatistics)ss3.getColumnStatistics()[1]).getMinimum());
+    assertEquals(2, ((IntegerColumnStatistics)ss2.getColumnStatistics()[1]).getMinimum());
+    assertEquals(3, ((IntegerColumnStatistics)ss3.getColumnStatistics()[1]).getMinimum());
     assertEquals(1, ((IntegerColumnStatistics)ss1.getColumnStatistics()[1]).getMaximum());
     assertEquals(2, ((IntegerColumnStatistics)ss2.getColumnStatistics()[1]).getMaximum());
     assertEquals(3, ((IntegerColumnStatistics)ss3.getColumnStatistics()[1]).getMaximum());
-    assertEquals(4996, ((IntegerColumnStatistics)ss1.getColumnStatistics()[1]).getSum());
-    assertEquals(9996, ((IntegerColumnStatistics)ss2.getColumnStatistics()[1]).getSum());
-    assertEquals(3008, ((IntegerColumnStatistics)ss3.getColumnStatistics()[1]).getSum());
+    assertEquals(5000, ((IntegerColumnStatistics)ss1.getColumnStatistics()[1]).getSum());
+    assertEquals(10000, ((IntegerColumnStatistics)ss2.getColumnStatistics()[1]).getSum());
+    assertEquals(3000, ((IntegerColumnStatistics)ss3.getColumnStatistics()[1]).getSum());
 
-    assertEquals(4996, ((StringColumnStatistics)ss1.getColumnStatistics()[2]).getNumberOfValues());
+    assertEquals(5000, ((StringColumnStatistics)ss1.getColumnStatistics()[2]).getNumberOfValues());
     assertEquals(5000, ((StringColumnStatistics)ss2.getColumnStatistics()[2]).getNumberOfValues());
-    assertEquals(1004, ((StringColumnStatistics)ss3.getColumnStatistics()[2]).getNumberOfValues());
+    assertEquals(1000, ((StringColumnStatistics)ss3.getColumnStatistics()[2]).getNumberOfValues());
     assertEquals("one", ((StringColumnStatistics)ss1.getColumnStatistics()[2]).getMinimum());
-    assertEquals("one", ((StringColumnStatistics)ss2.getColumnStatistics()[2]).getMinimum());
+    assertEquals("two", ((StringColumnStatistics)ss2.getColumnStatistics()[2]).getMinimum());
     assertEquals("three", ((StringColumnStatistics)ss3.getColumnStatistics()[2]).getMinimum());
     assertEquals("one", ((StringColumnStatistics)ss1.getColumnStatistics()[2]).getMaximum());
     assertEquals("two", ((StringColumnStatistics)ss2.getColumnStatistics()[2]).getMaximum());
-    assertEquals("two", ((StringColumnStatistics)ss3.getColumnStatistics()[2]).getMaximum());
-    assertEquals(14988, ((StringColumnStatistics)ss1.getColumnStatistics()[2]).getSum());
+    assertEquals("three", ((StringColumnStatistics)ss3.getColumnStatistics()[2]).getMaximum());
+    assertEquals(15000, ((StringColumnStatistics)ss1.getColumnStatistics()[2]).getSum());
     assertEquals(15000, ((StringColumnStatistics)ss2.getColumnStatistics()[2]).getSum());
-    assertEquals(5012, ((StringColumnStatistics)ss3.getColumnStatistics()[2]).getSum());
+    assertEquals(5000, ((StringColumnStatistics)ss3.getColumnStatistics()[2]).getSum());
 
     RecordReaderImpl recordReader = (RecordReaderImpl) reader.rows(null);
     OrcProto.RowIndex[] index = recordReader.readRowIndex(0);

Modified: hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestStringRedBlackTree.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestStringRedBlackTree.java?rev=1571964&r1=1571963&r2=1571964&view=diff
==============================================================================
--- hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestStringRedBlackTree.java (original)
+++ hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestStringRedBlackTree.java Wed Feb 26 08:50:22 2014
@@ -262,7 +262,7 @@ public class TestStringRedBlackTree {
     bit.testCompressedSeek();
     bit.testBiggerItems();
     bit.testSkips();
-    TestOrcFile test1 = new TestOrcFile();
+    TestOrcFile test1 = new TestOrcFile(false);
     test1.test1();
     test1.emptyFile();
     test1.metaData();