You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2014/06/17 19:59:26 UTC

svn commit: r1603240 [1/2] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ ql/src/test/org/apache/hadoop/hive/ql/io/orc/ ql/src/test/results/clientpositive/ ql/src/test/results/clientpo...

Author: prasanthj
Date: Tue Jun 17 17:59:25 2014
New Revision: 1603240

URL: http://svn.apache.org/r1603240
Log:
HIVE:7219 - Improve performance of serialization utils in ORC (Prasanth J reviewed by Gunther Hagleitner, Gopal V)

Added:
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestUnrolledBitPack.java
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml.template
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestSerializationUtils.java
    hive/trunk/ql/src/test/results/clientpositive/annotate_stats_filter.q.out
    hive/trunk/ql/src/test/results/clientpositive/annotate_stats_groupby.q.out
    hive/trunk/ql/src/test/results/clientpositive/annotate_stats_join.q.out
    hive/trunk/ql/src/test/results/clientpositive/annotate_stats_part.q.out
    hive/trunk/ql/src/test/results/clientpositive/annotate_stats_union.q.out
    hive/trunk/ql/src/test/results/clientpositive/dynpart_sort_opt_vectorization.q.out
    hive/trunk/ql/src/test/results/clientpositive/orc_analyze.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/orc_analyze.q.out

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1603240&r1=1603239&r2=1603240&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Jun 17 17:59:25 2014
@@ -552,6 +552,9 @@ public class HiveConf extends Configurat
         true),
     // Define the default compression codec for ORC file
     HIVE_ORC_DEFAULT_COMPRESS("hive.exec.orc.default.compress", "ZLIB"),
+    // Define the default encoding strategy to use
+    HIVE_ORC_ENCODING_STRATEGY("hive.exec.orc.encoding.strategy", "SPEED",
+        new StringsValidator("SPEED", "COMPRESSION")),
     HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false),
     HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000),
     HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS("hive.orc.compute.splits.num.threads", 10),

Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1603240&r1=1603239&r2=1603240&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Tue Jun 17 17:59:25 2014
@@ -1970,6 +1970,17 @@
 </property>
 
 <property>
+  <name>hive.exec.orc.encoding.strategy</name>
+  <value>SPEED</value>
+  <description>
+    Define the encoding strategy to use while writing data. Changing this will
+    only affect the light weight encoding for integers. This flag will not
+    change the compression level of higher level compression codec (like ZLIB).
+    Possible options are SPEED and COMPRESSION.
+  </description>
+</property>
+
+<property>
   <name>hive.exec.orc.dictionary.key.size.threshold</name>
   <value>0.8</value>
   <description>

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java?rev=1603240&r1=1603239&r2=1603240&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java Tue Jun 17 17:59:25 2014
@@ -95,6 +95,9 @@ public final class OrcFile {
     }
   }
 
+  public static enum EncodingStrategy {
+    SPEED, COMPRESSION;
+  }
 
   // Note : these string definitions for table properties are deprecated,
   // and retained only for backward compatibility, please do not add to
@@ -117,7 +120,8 @@ public final class OrcFile {
     STRIPE_SIZE("orc.stripe.size"),
     ROW_INDEX_STRIDE("orc.row.index.stride"),
     ENABLE_INDEXES("orc.create.index"),
-    BLOCK_PADDING("orc.block.padding");
+    BLOCK_PADDING("orc.block.padding"),
+    ENCODING_STRATEGY("orc.encoding.strategy");
 
     private final String propName;
 
@@ -221,6 +225,7 @@ public final class OrcFile {
     private MemoryManager memoryManagerValue;
     private Version versionValue;
     private WriterCallback callback;
+    private EncodingStrategy encodingStrategy;
 
     WriterOptions(Configuration conf) {
       configuration = conf;
@@ -250,6 +255,13 @@ public final class OrcFile {
       } else {
         versionValue = Version.byName(versionName);
       }
+      String enString =
+          conf.get(HiveConf.ConfVars.HIVE_ORC_ENCODING_STRATEGY.varname);
+      if (enString == null) {
+        encodingStrategy = EncodingStrategy.SPEED;
+      } else {
+        encodingStrategy = EncodingStrategy.valueOf(enString);
+      }
     }
 
     /**
@@ -301,6 +313,14 @@ public final class OrcFile {
     }
 
     /**
+     * Sets the encoding strategy that is used to encode the data.
+     */
+    public WriterOptions encodingStrategy(EncodingStrategy strategy) {
+      encodingStrategy = strategy;
+      return this;
+    }
+
+    /**
      * Sets the generic compression that is used to compress the data.
      */
     public WriterOptions compress(CompressionKind value) {
@@ -370,7 +390,7 @@ public final class OrcFile {
                           opts.stripeSizeValue, opts.compressValue,
                           opts.bufferSizeValue, opts.rowIndexStrideValue,
                           opts.memoryManagerValue, opts.blockPaddingValue,
-                          opts.versionValue, opts.callback);
+                          opts.versionValue, opts.callback, opts.encodingStrategy);
   }
 
   /**

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java?rev=1603240&r1=1603239&r2=1603240&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java Tue Jun 17 17:59:25 2014
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.io.Acid
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter;
 import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile.EncodingStrategy;
 import org.apache.hadoop.hive.ql.io.orc.OrcSerde.OrcSerdeRow;
 import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -164,6 +165,11 @@ public class OrcOutputFormat extends Fil
       options.blockPadding(Boolean.parseBoolean(propVal));
     }
 
+    if ((propVal = getSettingFromPropsFallingBackToConf(
+        OrcFile.OrcTableProperties.ENCODING_STRATEGY.getPropName(),props,conf)) != null){
+      options.encodingStrategy(EncodingStrategy.valueOf(propVal));
+    }
+
     return options;
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1603240&r1=1603239&r2=1603240&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Tue Jun 17 17:59:25 2014
@@ -23,7 +23,6 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
-import java.sql.Date;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -746,9 +745,11 @@ class RecordReaderImpl implements Record
 
   private static class FloatTreeReader extends TreeReader{
     private InStream stream;
+    private final SerializationUtils utils;
 
     FloatTreeReader(Path path, int columnId, Configuration conf) {
       super(path, columnId, conf);
+      this.utils = new SerializationUtils();
     }
 
     @Override
@@ -777,7 +778,7 @@ class RecordReaderImpl implements Record
         } else {
           result = (FloatWritable) previous;
         }
-        result.set(SerializationUtils.readFloat(stream));
+        result.set(utils.readFloat(stream));
       }
       return result;
     }
@@ -797,7 +798,7 @@ class RecordReaderImpl implements Record
       // Read value entries based on isNull entries
       for (int i = 0; i < batchSize; i++) {
         if (!result.isNull[i]) {
-          result.vector[i] = SerializationUtils.readFloat(stream);
+          result.vector[i] = utils.readFloat(stream);
         } else {
 
           // If the value is not present then set NaN
@@ -819,16 +820,18 @@ class RecordReaderImpl implements Record
     void skipRows(long items) throws IOException {
       items = countNonNulls(items);
       for(int i=0; i < items; ++i) {
-        SerializationUtils.readFloat(stream);
+        utils.readFloat(stream);
       }
     }
   }
 
   private static class DoubleTreeReader extends TreeReader{
     private InStream stream;
+    private final SerializationUtils utils;
 
     DoubleTreeReader(Path path, int columnId, Configuration conf) {
       super(path, columnId, conf);
+      this.utils = new SerializationUtils();
     }
 
     @Override
@@ -858,7 +861,7 @@ class RecordReaderImpl implements Record
         } else {
           result = (DoubleWritable) previous;
         }
-        result.set(SerializationUtils.readDouble(stream));
+        result.set(utils.readDouble(stream));
       }
       return result;
     }
@@ -878,7 +881,7 @@ class RecordReaderImpl implements Record
       // Read value entries based on isNull entries
       for (int i = 0; i < batchSize; i++) {
         if (!result.isNull[i]) {
-          result.vector[i] = SerializationUtils.readDouble(stream);
+          result.vector[i] = utils.readDouble(stream);
         } else {
           // If the value is not present then set NaN
           result.vector[i] = Double.NaN;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java?rev=1603240&r1=1603239&r2=1603240&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java Tue Jun 17 17:59:25 2014
@@ -34,10 +34,12 @@ class RunLengthIntegerReader implements 
   private int delta = 0;
   private int used = 0;
   private boolean repeat = false;
+  private SerializationUtils utils;
 
   RunLengthIntegerReader(InStream input, boolean signed) throws IOException {
     this.input = input;
     this.signed = signed;
+    this.utils = new SerializationUtils();
   }
 
   private void readValues() throws IOException {
@@ -55,9 +57,9 @@ class RunLengthIntegerReader implements 
       // convert from 0 to 255 to -128 to 127 by converting to a signed byte
       delta = (byte) (0 + delta);
       if (signed) {
-        literals[0] = SerializationUtils.readVslong(input);
+        literals[0] = utils.readVslong(input);
       } else {
-        literals[0] = SerializationUtils.readVulong(input);
+        literals[0] = utils.readVulong(input);
       }
     } else {
       repeat = false;
@@ -65,9 +67,9 @@ class RunLengthIntegerReader implements 
       used = 0;
       for(int i=0; i < numLiterals; ++i) {
         if (signed) {
-          literals[i] = SerializationUtils.readVslong(input);
+          literals[i] = utils.readVslong(input);
         } else {
-          literals[i] = SerializationUtils.readVulong(input);
+          literals[i] = utils.readVulong(input);
         }
       }
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java?rev=1603240&r1=1603239&r2=1603240&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java Tue Jun 17 17:59:25 2014
@@ -39,12 +39,14 @@ class RunLengthIntegerReaderV2 implement
   private int numLiterals = 0;
   private int used = 0;
   private final boolean skipCorrupt;
+  private final SerializationUtils utils;
 
   RunLengthIntegerReaderV2(InStream input, boolean signed,
       Configuration conf) throws IOException {
     this.input = input;
     this.signed = signed;
     this.skipCorrupt = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA);
+    this.utils = new SerializationUtils();
   }
 
   private void readValues() throws IOException {
@@ -71,7 +73,7 @@ class RunLengthIntegerReaderV2 implement
     // extract the number of fixed bits
     int fb = (firstByte >>> 1) & 0x1f;
     if (fb != 0) {
-      fb = SerializationUtils.decodeBitWidth(fb);
+      fb = utils.decodeBitWidth(fb);
     }
 
     // extract the blob run length
@@ -81,9 +83,9 @@ class RunLengthIntegerReaderV2 implement
     // read the first value stored as vint
     long firstVal = 0;
     if (signed) {
-      firstVal = SerializationUtils.readVslong(input);
+      firstVal = utils.readVslong(input);
     } else {
-      firstVal = SerializationUtils.readVulong(input);
+      firstVal = utils.readVulong(input);
     }
 
     // store first value to result buffer
@@ -94,14 +96,14 @@ class RunLengthIntegerReaderV2 implement
     if (fb == 0) {
       // read the fixed delta value stored as vint (deltas can be negative even
       // if all number are positive)
-      long fd = SerializationUtils.readVslong(input);
+      long fd = utils.readVslong(input);
 
       // add fixed deltas to adjacent values
       for(int i = 0; i < len; i++) {
         literals[numLiterals++] = literals[numLiterals - 2] + fd;
       }
     } else {
-      long deltaBase = SerializationUtils.readVslong(input);
+      long deltaBase = utils.readVslong(input);
       // add delta base and first value
       literals[numLiterals++] = firstVal + deltaBase;
       prevVal = literals[numLiterals - 1];
@@ -110,7 +112,7 @@ class RunLengthIntegerReaderV2 implement
       // write the unpacked values, add it to previous value and store final
       // value to result buffer. if the delta base value is negative then it
       // is a decreasing sequence else an increasing sequence
-      SerializationUtils.readInts(literals, numLiterals, len, fb, input);
+      utils.readInts(literals, numLiterals, len, fb, input);
       while (len > 0) {
         if (deltaBase < 0) {
           literals[numLiterals] = prevVal - literals[numLiterals];
@@ -128,7 +130,7 @@ class RunLengthIntegerReaderV2 implement
 
     // extract the number of fixed bits
     int fbo = (firstByte >>> 1) & 0x1f;
-    int fb = SerializationUtils.decodeBitWidth(fbo);
+    int fb = utils.decodeBitWidth(fbo);
 
     // extract the run length of data blob
     int len = (firstByte & 0x01) << 8;
@@ -144,7 +146,7 @@ class RunLengthIntegerReaderV2 implement
 
     // extract patch width
     int pwo = thirdByte & 0x1f;
-    int pw = SerializationUtils.decodeBitWidth(pwo);
+    int pw = utils.decodeBitWidth(pwo);
 
     // read fourth byte and extract patch gap width
     int fourthByte = input.read();
@@ -156,7 +158,7 @@ class RunLengthIntegerReaderV2 implement
     int pl = fourthByte & 0x1f;
 
     // read the next base width number of bytes to extract base value
-    long base = SerializationUtils.bytesToLongBE(input, bw);
+    long base = utils.bytesToLongBE(input, bw);
     long mask = (1L << ((bw * 8) - 1));
     // if MSB of base value is 1 then base is negative value else positive
     if ((base & mask) != 0) {
@@ -166,7 +168,7 @@ class RunLengthIntegerReaderV2 implement
 
     // unpack the data blob
     long[] unpacked = new long[len];
-    SerializationUtils.readInts(unpacked, 0, len, fb, input);
+    utils.readInts(unpacked, 0, len, fb, input);
 
     // unpack the patch blob
     long[] unpackedPatch = new long[pl];
@@ -174,8 +176,8 @@ class RunLengthIntegerReaderV2 implement
     if ((pw + pgw) > 64 && !skipCorrupt) {
       throw new IOException(ErrorMsg.ORC_CORRUPTED_READ.getMsg());
     }
-    int bitSize = SerializationUtils.getClosestFixedBits(pw + pgw);
-    SerializationUtils.readInts(unpackedPatch, 0, pl, bitSize, input);
+    int bitSize = utils.getClosestFixedBits(pw + pgw);
+    utils.readInts(unpackedPatch, 0, pl, bitSize, input);
 
     // apply the patch directly when decoding the packed data
     int patchIdx = 0;
@@ -241,7 +243,7 @@ class RunLengthIntegerReaderV2 implement
 
     // extract the number of fixed bits
     int fbo = (firstByte >>> 1) & 0x1f;
-    int fb = SerializationUtils.decodeBitWidth(fbo);
+    int fb = utils.decodeBitWidth(fbo);
 
     // extract the run length
     int len = (firstByte & 0x01) << 8;
@@ -250,11 +252,10 @@ class RunLengthIntegerReaderV2 implement
     len += 1;
 
     // write the unpacked values and zigzag decode to result buffer
-    SerializationUtils.readInts(literals, numLiterals, len, fb, input);
+    utils.readInts(literals, numLiterals, len, fb, input);
     if (signed) {
       for(int i = 0; i < len; i++) {
-        literals[numLiterals] = SerializationUtils
-            .zigzagDecode(literals[numLiterals]);
+        literals[numLiterals] = utils.zigzagDecode(literals[numLiterals]);
         numLiterals++;
       }
     } else {
@@ -275,10 +276,10 @@ class RunLengthIntegerReaderV2 implement
     len += RunLengthIntegerWriterV2.MIN_REPEAT;
 
     // read the repeated value which is store using fixed bytes
-    long val = SerializationUtils.bytesToLongBE(input, size);
+    long val = utils.bytesToLongBE(input, size);
 
     if (signed) {
-      val = SerializationUtils.zigzagDecode(val);
+      val = utils.zigzagDecode(val);
     }
 
     // repeat the value for length times

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java?rev=1603240&r1=1603239&r2=1603240&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java Tue Jun 17 17:59:25 2014
@@ -38,11 +38,13 @@ class RunLengthIntegerWriter implements 
   private long delta = 0;
   private boolean repeat = false;
   private int tailRunLength = 0;
+  private SerializationUtils utils;
 
   RunLengthIntegerWriter(PositionedOutputStream output,
                          boolean signed) {
     this.output = output;
     this.signed = signed;
+    this.utils = new SerializationUtils();
   }
 
   private void writeValues() throws IOException {
@@ -51,17 +53,17 @@ class RunLengthIntegerWriter implements 
         output.write(numLiterals - MIN_REPEAT_SIZE);
         output.write((byte) delta);
         if (signed) {
-          SerializationUtils.writeVslong(output, literals[0]);
+          utils.writeVslong(output, literals[0]);
         } else {
-          SerializationUtils.writeVulong(output, literals[0]);
+          utils.writeVulong(output, literals[0]);
         }
       } else {
         output.write(-numLiterals);
         for(int i=0; i < numLiterals; ++i) {
           if (signed) {
-            SerializationUtils.writeVslong(output, literals[i]);
+            utils.writeVslong(output, literals[i]);
           } else {
-            SerializationUtils.writeVulong(output, literals[i]);
+            utils.writeVulong(output, literals[i]);
           }
         }
       }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java?rev=1603240&r1=1603239&r2=1603240&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java Tue Jun 17 17:59:25 2014
@@ -157,10 +157,19 @@ class RunLengthIntegerWriterV2 implement
   private long[] gapVsPatchList;
   private long min;
   private boolean isFixedDelta;
+  private SerializationUtils utils;
+  private boolean alignedBitpacking;
 
   RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed) {
+    this(output, signed, true);
+  }
+
+  RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed,
+      boolean alignedBitpacking) {
     this.output = output;
     this.signed = signed;
+    this.alignedBitpacking = alignedBitpacking;
+    this.utils = new SerializationUtils();
     clear();
   }
 
@@ -187,6 +196,10 @@ class RunLengthIntegerWriterV2 implement
     int fb = bitsDeltaMax;
     int efb = 0;
 
+    if (alignedBitpacking) {
+      fb = utils.getClosestAlignedFixedBits(fb);
+    }
+
     if (isFixedDelta) {
       // if fixed run length is greater than threshold then it will be fixed
       // delta sequence with delta value 0 else fixed delta sequence with
@@ -206,20 +219,20 @@ class RunLengthIntegerWriterV2 implement
       if (fb == 1) {
         fb = 2;
       }
-      efb = SerializationUtils.encodeBitWidth(fb);
+      efb = utils.encodeBitWidth(fb);
       efb = efb << 1;
       len = variableRunLength - 1;
       variableRunLength = 0;
     }
 
     // extract the 9th bit of run length
-    int tailBits = (len & 0x100) >>> 8;
+    final int tailBits = (len & 0x100) >>> 8;
 
     // create first byte of the header
-    int headerFirstByte = getOpcode() | efb | tailBits;
+    final int headerFirstByte = getOpcode() | efb | tailBits;
 
     // second byte of the header stores the remaining 8 bits of runlength
-    int headerSecondByte = len & 0xff;
+    final int headerSecondByte = len & 0xff;
 
     // write header
     output.write(headerFirstByte);
@@ -227,43 +240,50 @@ class RunLengthIntegerWriterV2 implement
 
     // store the first value from zigzag literal array
     if (signed) {
-      SerializationUtils.writeVslong(output, literals[0]);
+      utils.writeVslong(output, literals[0]);
     } else {
-      SerializationUtils.writeVulong(output, literals[0]);
+      utils.writeVulong(output, literals[0]);
     }
 
     if (isFixedDelta) {
       // if delta is fixed then we don't need to store delta blob
-      SerializationUtils.writeVslong(output, fixedDelta);
+      utils.writeVslong(output, fixedDelta);
     } else {
       // store the first value as delta value using zigzag encoding
-      SerializationUtils.writeVslong(output, adjDeltas[0]);
+      utils.writeVslong(output, adjDeltas[0]);
+
       // adjacent delta values are bit packed
-      SerializationUtils.writeInts(adjDeltas, 1, adjDeltas.length - 1, fb,
-          output);
+      utils.writeInts(adjDeltas, 1, adjDeltas.length - 1, fb, output);
     }
   }
 
   private void writePatchedBaseValues() throws IOException {
 
+    // NOTE: Aligned bit packing cannot be applied for PATCHED_BASE encoding
+    // because patch is applied to MSB bits. For example: If fixed bit width of
+    // base value is 7 bits and if patch is 3 bits, the actual value is
+    // constructed by shifting the patch to left by 7 positions.
+    // actual_value = patch << 7 | base_value
+    // So, if we align base_value then actual_value can not be reconstructed.
+
     // write the number of fixed bits required in next 5 bits
-    int fb = brBits95p;
-    int efb = SerializationUtils.encodeBitWidth(fb) << 1;
+    final int fb = brBits95p;
+    final int efb = utils.encodeBitWidth(fb) << 1;
 
     // adjust variable run length, they are one off
     variableRunLength -= 1;
 
     // extract the 9th bit of run length
-    int tailBits = (variableRunLength & 0x100) >>> 8;
+    final int tailBits = (variableRunLength & 0x100) >>> 8;
 
     // create first byte of the header
-    int headerFirstByte = getOpcode() | efb | tailBits;
+    final int headerFirstByte = getOpcode() | efb | tailBits;
 
     // second byte of the header stores the remaining 8 bits of runlength
-    int headerSecondByte = variableRunLength & 0xff;
+    final int headerSecondByte = variableRunLength & 0xff;
 
     // if the min value is negative toggle the sign
-    boolean isNegative = min < 0 ? true : false;
+    final boolean isNegative = min < 0 ? true : false;
     if (isNegative) {
       min = -min;
     }
@@ -271,9 +291,9 @@ class RunLengthIntegerWriterV2 implement
     // find the number of bytes required for base and shift it by 5 bits
     // to accommodate patch width. The additional bit is used to store the sign
     // of the base value.
-    int baseWidth = SerializationUtils.findClosestNumBits(min) + 1;
-    int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1;
-    int bb = (baseBytes - 1) << 5;
+    final int baseWidth = utils.findClosestNumBits(min) + 1;
+    final int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1;
+    final int bb = (baseBytes - 1) << 5;
 
     // if the base value is negative then set MSB to 1
     if (isNegative) {
@@ -282,11 +302,11 @@ class RunLengthIntegerWriterV2 implement
 
     // third byte contains 3 bits for number of bytes occupied by base
     // and 5 bits for patchWidth
-    int headerThirdByte = bb | SerializationUtils.encodeBitWidth(patchWidth);
+    final int headerThirdByte = bb | utils.encodeBitWidth(patchWidth);
 
     // fourth byte contains 3 bits for page gap width and 5 bits for
     // patch length
-    int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength;
+    final int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength;
 
     // write header
     output.write(headerFirstByte);
@@ -301,15 +321,16 @@ class RunLengthIntegerWriterV2 implement
     }
 
     // base reduced literals are bit packed
-    int closestFixedBits = SerializationUtils.getClosestFixedBits(brBits95p);
-    SerializationUtils.writeInts(baseRedLiterals, 0, baseRedLiterals.length,
-        closestFixedBits, output);
+    int closestFixedBits = utils.getClosestFixedBits(fb);
+
+    utils.writeInts(baseRedLiterals, 0, baseRedLiterals.length, closestFixedBits,
+        output);
 
     // write patch list
-    closestFixedBits = SerializationUtils.getClosestFixedBits(patchGapWidth
-        + patchWidth);
-    SerializationUtils.writeInts(gapVsPatchList, 0, gapVsPatchList.length,
-        closestFixedBits, output);
+    closestFixedBits = utils.getClosestFixedBits(patchGapWidth + patchWidth);
+
+    utils.writeInts(gapVsPatchList, 0, gapVsPatchList.length, closestFixedBits,
+        output);
 
     // reset run length
     variableRunLength = 0;
@@ -326,27 +347,32 @@ class RunLengthIntegerWriterV2 implement
   private void writeDirectValues() throws IOException {
 
     // write the number of fixed bits required in next 5 bits
-    int efb = SerializationUtils.encodeBitWidth(zzBits100p) << 1;
+    int fb = zzBits100p;
+
+    if (alignedBitpacking) {
+      fb = utils.getClosestAlignedFixedBits(fb);
+    }
+
+    final int efb = utils.encodeBitWidth(fb) << 1;
 
     // adjust variable run length
     variableRunLength -= 1;
 
     // extract the 9th bit of run length
-    int tailBits = (variableRunLength & 0x100) >>> 8;
+    final int tailBits = (variableRunLength & 0x100) >>> 8;
 
     // create first byte of the header
-    int headerFirstByte = getOpcode() | efb | tailBits;
+    final int headerFirstByte = getOpcode() | efb | tailBits;
 
     // second byte of the header stores the remaining 8 bits of runlength
-    int headerSecondByte = variableRunLength & 0xff;
+    final int headerSecondByte = variableRunLength & 0xff;
 
     // write header
     output.write(headerFirstByte);
     output.write(headerSecondByte);
 
     // bit packing the zigzag encoded literals
-    SerializationUtils.writeInts(zigzagLiterals, 0, zigzagLiterals.length,
-        zzBits100p, output);
+    utils.writeInts(zigzagLiterals, 0, zigzagLiterals.length, fb, output);
 
     // reset run length
     variableRunLength = 0;
@@ -356,13 +382,13 @@ class RunLengthIntegerWriterV2 implement
     // get the value that is repeating, compute the bits and bytes required
     long repeatVal = 0;
     if (signed) {
-      repeatVal = SerializationUtils.zigzagEncode(literals[0]);
+      repeatVal = utils.zigzagEncode(literals[0]);
     } else {
       repeatVal = literals[0];
     }
 
-    int numBitsRepeatVal = SerializationUtils.findClosestNumBits(repeatVal);
-    int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3
+    final int numBitsRepeatVal = utils.findClosestNumBits(repeatVal);
+    final int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3
         : (numBitsRepeatVal >>> 3) + 1;
 
     // write encoding type in top 2 bits
@@ -440,7 +466,7 @@ class RunLengthIntegerWriterV2 implement
         // populate zigzag encoded literals
         long zzEncVal = 0;
         if (signed) {
-          zzEncVal = SerializationUtils.zigzagEncode(literals[i]);
+          zzEncVal = utils.zigzagEncode(literals[i]);
         } else {
           zzEncVal = literals[i];
         }
@@ -464,7 +490,7 @@ class RunLengthIntegerWriterV2 implement
 
       // stores the number of bits required for packing delta blob in
       // delta encoding
-      bitsDeltaMax = SerializationUtils.findClosestNumBits(deltaMax);
+      bitsDeltaMax = utils.findClosestNumBits(deltaMax);
 
       // if decreasing count equals total number of literals then the
       // sequence is monotonically decreasing
@@ -504,10 +530,10 @@ class RunLengthIntegerWriterV2 implement
     // is not significant then we can use direct or delta encoding
 
     double p = 0.9;
-    zzBits90p = SerializationUtils.percentileBits(zigzagLiterals, p);
+    zzBits90p = utils.percentileBits(zigzagLiterals, p);
 
     p = 1.0;
-    zzBits100p = SerializationUtils.percentileBits(zigzagLiterals, p);
+    zzBits100p = utils.percentileBits(zigzagLiterals, p);
 
     int diffBitsLH = zzBits100p - zzBits90p;
 
@@ -524,11 +550,11 @@ class RunLengthIntegerWriterV2 implement
       // 95th percentile width is used to determine max allowed value
       // after which patching will be done
       p = 0.95;
-      brBits95p = SerializationUtils.percentileBits(baseRedLiterals, p);
+      brBits95p = utils.percentileBits(baseRedLiterals, p);
 
       // 100th percentile is used to compute the max patch width
       p = 1.0;
-      brBits100p = SerializationUtils.percentileBits(baseRedLiterals, p);
+      brBits100p = utils.percentileBits(baseRedLiterals, p);
 
       // after base reducing the values, if the difference in bits between
       // 95th percentile and 100th percentile value is zero then there
@@ -573,7 +599,7 @@ class RunLengthIntegerWriterV2 implement
 
     // #bit for patch
     patchWidth = brBits100p - brBits95p;
-    patchWidth = SerializationUtils.getClosestFixedBits(patchWidth);
+    patchWidth = utils.getClosestFixedBits(patchWidth);
 
     // if patch bit requirement is 64 then it will not possible to pack
     // gap and patch together in a long. To make sure gap and patch can be
@@ -619,7 +645,7 @@ class RunLengthIntegerWriterV2 implement
     if (maxGap == 0 && patchLength != 0) {
       patchGapWidth = 1;
     } else {
-      patchGapWidth = SerializationUtils.findClosestNumBits(maxGap);
+      patchGapWidth = utils.findClosestNumBits(maxGap);
     }
 
     // special case: if the patch gap width is greater than 256, then

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java?rev=1603240&r1=1603239&r2=1603240&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java Tue Jun 17 17:59:25 2014
@@ -26,10 +26,16 @@ import java.math.BigInteger;
 
 final class SerializationUtils {
 
-  // unused
-  private SerializationUtils() {}
+  private final static int BUFFER_SIZE = 64;
+  private final byte[] readBuffer;
+  private final byte[] writeBuffer;
 
-  static void writeVulong(OutputStream output, long value) throws IOException {
+  public SerializationUtils() {
+    this.readBuffer = new byte[BUFFER_SIZE];
+    this.writeBuffer = new byte[BUFFER_SIZE];
+  }
+
+  void writeVulong(OutputStream output, long value) throws IOException {
     while (true) {
       if ((value & ~0x7f) == 0) {
         output.write((byte) value);
@@ -41,12 +47,12 @@ final class SerializationUtils {
     }
   }
 
-  static void writeVslong(OutputStream output, long value) throws IOException {
+  void writeVslong(OutputStream output, long value) throws IOException {
     writeVulong(output, (value << 1) ^ (value >> 63));
   }
 
 
-  static long readVulong(InputStream in) throws IOException {
+  long readVulong(InputStream in) throws IOException {
     long result = 0;
     long b;
     int offset = 0;
@@ -61,18 +67,18 @@ final class SerializationUtils {
     return result;
   }
 
-  static long readVslong(InputStream in) throws IOException {
+  long readVslong(InputStream in) throws IOException {
     long result = readVulong(in);
     return (result >>> 1) ^ -(result & 1);
   }
 
-  static float readFloat(InputStream in) throws IOException {
+  float readFloat(InputStream in) throws IOException {
     int ser = in.read() | (in.read() << 8) | (in.read() << 16) |
       (in.read() << 24);
     return Float.intBitsToFloat(ser);
   }
 
-  static void writeFloat(OutputStream output, float value) throws IOException {
+  void writeFloat(OutputStream output, float value) throws IOException {
     int ser = Float.floatToIntBits(value);
     output.write(ser & 0xff);
     output.write((ser >> 8) & 0xff);
@@ -80,29 +86,36 @@ final class SerializationUtils {
     output.write((ser >> 24) & 0xff);
   }
 
-  static double readDouble(InputStream in) throws IOException {
-  long ser = (long) in.read() |
-             ((long) in.read() << 8) |
-             ((long) in.read() << 16) |
-             ((long) in.read() << 24) |
-             ((long) in.read() << 32) |
-             ((long) in.read() << 40) |
-             ((long) in.read() << 48) |
-             ((long) in.read() << 56);
-    return Double.longBitsToDouble(ser);
-  }
-
-  static void writeDouble(OutputStream output,
-                          double value) throws IOException {
-    long ser = Double.doubleToLongBits(value);
-    output.write(((int) ser) & 0xff);
-    output.write(((int) (ser >> 8)) & 0xff);
-    output.write(((int) (ser >> 16)) & 0xff);
-    output.write(((int) (ser >> 24)) & 0xff);
-    output.write(((int) (ser >> 32)) & 0xff);
-    output.write(((int) (ser >> 40)) & 0xff);
-    output.write(((int) (ser >> 48)) & 0xff);
-    output.write(((int) (ser >> 56)) & 0xff);
+  double readDouble(InputStream in) throws IOException {
+    return Double.longBitsToDouble(readLongLE(in));
+  }
+
+  long readLongLE(InputStream in) throws IOException {
+    in.read(readBuffer, 0, 8);
+    return (((readBuffer[0] & 0xff) << 0)
+        + ((readBuffer[1] & 0xff) << 8)
+        + ((readBuffer[2] & 0xff) << 16)
+        + ((long) (readBuffer[3] & 0xff) << 24)
+        + ((long) (readBuffer[4] & 0xff) << 32)
+        + ((long) (readBuffer[5] & 0xff) << 40)
+        + ((long) (readBuffer[6] & 0xff) << 48)
+        + ((long) (readBuffer[7] & 0xff) << 56));
+  }
+
+  void writeDouble(OutputStream output, double value) throws IOException {
+    writeLongLE(output, Double.doubleToLongBits(value));
+  }
+
+  private void writeLongLE(OutputStream output, long value) throws IOException {
+    writeBuffer[0] = (byte) ((value >> 0)  & 0xff);
+    writeBuffer[1] = (byte) ((value >> 8)  & 0xff);
+    writeBuffer[2] = (byte) ((value >> 16) & 0xff);
+    writeBuffer[3] = (byte) ((value >> 24) & 0xff);
+    writeBuffer[4] = (byte) ((value >> 32) & 0xff);
+    writeBuffer[5] = (byte) ((value >> 40) & 0xff);
+    writeBuffer[6] = (byte) ((value >> 48) & 0xff);
+    writeBuffer[7] = (byte) ((value >> 56) & 0xff);
+    output.write(writeBuffer, 0, 8);
   }
 
   /**
@@ -198,7 +211,7 @@ final class SerializationUtils {
    * @param value
    * @return bits required to store value
    */
-  static int findClosestNumBits(long value) {
+  int findClosestNumBits(long value) {
     int count = 0;
     while (value != 0) {
       count++;
@@ -212,7 +225,7 @@ final class SerializationUtils {
    * @param val
    * @return zigzag encoded value
    */
-  static long zigzagEncode(long val) {
+  long zigzagEncode(long val) {
     return (val << 1) ^ (val >> 63);
   }
 
@@ -221,7 +234,7 @@ final class SerializationUtils {
    * @param val
    * @return zizag decoded value
    */
-  static long zigzagDecode(long val) {
+  long zigzagDecode(long val) {
     return (val >>> 1) ^ -(val & 1);
   }
 
@@ -231,7 +244,7 @@ final class SerializationUtils {
    * @param p - percentile value (>=0.0 to <=1.0)
    * @return pth percentile bits
    */
-  static int percentileBits(long[] data, double p) {
+  int percentileBits(long[] data, double p) {
     if ((p > 1.0) || (p <= 0.0)) {
       return -1;
     }
@@ -265,7 +278,7 @@ final class SerializationUtils {
    * @param b - byte array
    * @return long value
    */
-  static long bytesToLongBE(InStream input, int n) throws IOException {
+  long bytesToLongBE(InStream input, int n) throws IOException {
     long out = 0;
     long val = 0;
     while (n > 0) {
@@ -283,7 +296,7 @@ final class SerializationUtils {
    * @param numBits - bit width
    * @return number of bytes required
    */
-  static int getTotalBytesRequired(int n, int numBits) {
+  int getTotalBytesRequired(int n, int numBits) {
     return (n * numBits + 7) / 8;
   }
 
@@ -293,7 +306,7 @@ final class SerializationUtils {
    * @param n
    * @return closest valid fixed bit
    */
-  static int getClosestFixedBits(int n) {
+  int getClosestFixedBits(int n) {
     if (n == 0) {
       return 1;
     }
@@ -319,13 +332,39 @@ final class SerializationUtils {
     }
   }
 
+  public int getClosestAlignedFixedBits(int n) {
+    if (n == 0 ||  n == 1) {
+      return 1;
+    } else if (n > 1 && n <= 2) {
+      return 2;
+    } else if (n > 2 && n <= 4) {
+      return 4;
+    } else if (n > 4 && n <= 8) {
+      return 8;
+    } else if (n > 8 && n <= 16) {
+      return 16;
+    } else if (n > 16 && n <= 24) {
+      return 24;
+    } else if (n > 24 && n <= 32) {
+      return 32;
+    } else if (n > 32 && n <= 40) {
+      return 40;
+    } else if (n > 40 && n <= 48) {
+      return 48;
+    } else if (n > 48 && n <= 56) {
+      return 56;
+    } else {
+      return 64;
+    }
+  }
+
   /**
    * Finds the closest available fixed bit width match and returns its encoded
    * value (ordinal)
    * @param n - fixed bit width to encode
    * @return encoded fixed bit width
    */
-  static int encodeBitWidth(int n) {
+  int encodeBitWidth(int n) {
     n = getClosestFixedBits(n);
 
     if (n >= 1 && n <= 24) {
@@ -354,7 +393,7 @@ final class SerializationUtils {
    * @param n - encoded fixed bit width
    * @return decoded fixed bit width
    */
-  static int decodeBitWidth(int n) {
+  int decodeBitWidth(int n) {
     if (n >= FixedBitSizes.ONE.ordinal()
         && n <= FixedBitSizes.TWENTYFOUR.ordinal()) {
       return n + 1;
@@ -386,13 +425,51 @@ final class SerializationUtils {
    * @param output - output stream
    * @throws IOException
    */
-  static void writeInts(long[] input, int offset, int len, int bitSize,
+  void writeInts(long[] input, int offset, int len, int bitSize,
                         OutputStream output) throws IOException {
     if (input == null || input.length < 1 || offset < 0 || len < 1
         || bitSize < 1) {
       return;
     }
 
+    switch (bitSize) {
+    case 1:
+      unrolledBitPack1(input, offset, len, output);
+      return;
+    case 2:
+      unrolledBitPack2(input, offset, len, output);
+      return;
+    case 4:
+      unrolledBitPack4(input, offset, len, output);
+      return;
+    case 8:
+      unrolledBitPack8(input, offset, len, output);
+      return;
+    case 16:
+      unrolledBitPack16(input, offset, len, output);
+      return;
+    case 24:
+      unrolledBitPack24(input, offset, len, output);
+      return;
+    case 32:
+      unrolledBitPack32(input, offset, len, output);
+      return;
+    case 40:
+      unrolledBitPack40(input, offset, len, output);
+      return;
+    case 48:
+      unrolledBitPack48(input, offset, len, output);
+      return;
+    case 56:
+      unrolledBitPack56(input, offset, len, output);
+      return;
+    case 64:
+      unrolledBitPack64(input, offset, len, output);
+      return;
+    default:
+      break;
+    }
+
     int bitsLeft = 8;
     byte current = 0;
     for(int i = offset; i < (offset + len); i++) {
@@ -426,6 +503,357 @@ final class SerializationUtils {
     }
   }
 
+  private void unrolledBitPack1(long[] input, int offset, int len,
+      OutputStream output) throws IOException {
+    final int numHops = 8;
+    final int remainder = len % numHops;
+    final int endOffset = offset + len;
+    final int endUnroll = endOffset - remainder;
+    int val = 0;
+    for (int i = offset; i < endUnroll; i = i + numHops) {
+      val = (int) (val | ((input[i] & 1) << 7)
+          | ((input[i + 1] & 1) << 6)
+          | ((input[i + 2] & 1) << 5)
+          | ((input[i + 3] & 1) << 4)
+          | ((input[i + 4] & 1) << 3)
+          | ((input[i + 5] & 1) << 2)
+          | ((input[i + 6] & 1) << 1)
+          | (input[i + 7]) & 1);
+      output.write(val);
+      val = 0;
+    }
+
+    if (remainder > 0) {
+      int startShift = 7;
+      for (int i = endUnroll; i < endOffset; i++) {
+        val = (int) (val | (input[i] & 1) << startShift);
+        startShift -= 1;
+      }
+      output.write(val);
+    }
+  }
+
+  private void unrolledBitPack2(long[] input, int offset, int len,
+      OutputStream output) throws IOException {
+    final int numHops = 4;
+    final int remainder = len % numHops;
+    final int endOffset = offset + len;
+    final int endUnroll = endOffset - remainder;
+    int val = 0;
+    for (int i = offset; i < endUnroll; i = i + numHops) {
+      val = (int) (val | ((input[i] & 3) << 6)
+          | ((input[i + 1] & 3) << 4)
+          | ((input[i + 2] & 3) << 2)
+          | (input[i + 3]) & 3);
+      output.write(val);
+      val = 0;
+    }
+
+    if (remainder > 0) {
+      int startShift = 6;
+      for (int i = endUnroll; i < endOffset; i++) {
+        val = (int) (val | (input[i] & 3) << startShift);
+        startShift -= 2;
+      }
+      output.write(val);
+    }
+  }
+
+  private void unrolledBitPack4(long[] input, int offset, int len,
+      OutputStream output) throws IOException {
+    final int numHops = 2;
+    final int remainder = len % numHops;
+    final int endOffset = offset + len;
+    final int endUnroll = endOffset - remainder;
+    int val = 0;
+    for (int i = offset; i < endUnroll; i = i + numHops) {
+      val = (int) (val | ((input[i] & 15) << 4) | (input[i + 1]) & 15);
+      output.write(val);
+      val = 0;
+    }
+
+    if (remainder > 0) {
+      int startShift = 4;
+      for (int i = endUnroll; i < endOffset; i++) {
+        val = (int) (val | (input[i] & 15) << startShift);
+        startShift -= 4;
+      }
+      output.write(val);
+    }
+  }
+
+  private void unrolledBitPack8(long[] input, int offset, int len,
+      OutputStream output) throws IOException {
+    unrolledBitPackBytes(input, offset, len, output, 1);
+  }
+
+  private void unrolledBitPack16(long[] input, int offset, int len,
+      OutputStream output) throws IOException {
+    unrolledBitPackBytes(input, offset, len, output, 2);
+  }
+
+  private void unrolledBitPack24(long[] input, int offset, int len,
+      OutputStream output) throws IOException {
+    unrolledBitPackBytes(input, offset, len, output, 3);
+  }
+
+  private void unrolledBitPack32(long[] input, int offset, int len,
+      OutputStream output) throws IOException {
+    unrolledBitPackBytes(input, offset, len, output, 4);
+  }
+
+  private void unrolledBitPack40(long[] input, int offset, int len,
+      OutputStream output) throws IOException {
+    unrolledBitPackBytes(input, offset, len, output, 5);
+  }
+
+  private void unrolledBitPack48(long[] input, int offset, int len,
+      OutputStream output) throws IOException {
+    unrolledBitPackBytes(input, offset, len, output, 6);
+  }
+
+  private void unrolledBitPack56(long[] input, int offset, int len,
+      OutputStream output) throws IOException {
+    unrolledBitPackBytes(input, offset, len, output, 7);
+  }
+
+  private void unrolledBitPack64(long[] input, int offset, int len,
+      OutputStream output) throws IOException {
+    unrolledBitPackBytes(input, offset, len, output, 8);
+  }
+
+  private void unrolledBitPackBytes(long[] input, int offset, int len, OutputStream output, int numBytes) throws IOException {
+    final int numHops = 8;
+    final int remainder = len % numHops;
+    final int endOffset = offset + len;
+    final int endUnroll = endOffset - remainder;
+    int i = offset;
+    for (; i < endUnroll; i = i + numHops) {
+      writeLongBE(output, input, i, numHops, numBytes);
+    }
+
+    if (remainder > 0) {
+      writeRemainingLongs(output, i, input, remainder, numBytes);
+    }
+  }
+
+  private void writeRemainingLongs(OutputStream output, int offset, long[] input, int remainder,
+      int numBytes) throws IOException {
+    final int numHops = remainder;
+
+    int idx = 0;
+    switch (numBytes) {
+    case 1:
+      while (remainder > 0) {
+        writeBuffer[idx] = (byte) (input[offset + idx] & 255);
+        remainder--;
+        idx++;
+      }
+      break;
+    case 2:
+      while (remainder > 0) {
+        writeLongBE2(output, input[offset + idx], idx * 2);
+        remainder--;
+        idx++;
+      }
+      break;
+    case 3:
+      while (remainder > 0) {
+        writeLongBE3(output, input[offset + idx], idx * 3);
+        remainder--;
+        idx++;
+      }
+      break;
+    case 4:
+      while (remainder > 0) {
+        writeLongBE4(output, input[offset + idx], idx * 4);
+        remainder--;
+        idx++;
+      }
+      break;
+    case 5:
+      while (remainder > 0) {
+        writeLongBE5(output, input[offset + idx], idx * 5);
+        remainder--;
+        idx++;
+      }
+      break;
+    case 6:
+      while (remainder > 0) {
+        writeLongBE6(output, input[offset + idx], idx * 6);
+        remainder--;
+        idx++;
+      }
+      break;
+    case 7:
+      while (remainder > 0) {
+        writeLongBE7(output, input[offset + idx], idx * 7);
+        remainder--;
+        idx++;
+      }
+      break;
+    case 8:
+      while (remainder > 0) {
+        writeLongBE8(output, input[offset + idx], idx * 8);
+        remainder--;
+        idx++;
+      }
+      break;
+    default:
+      break;
+    }
+
+    final int toWrite = numHops * numBytes;
+    output.write(writeBuffer, 0, toWrite);
+  }
+
+  private void writeLongBE(OutputStream output, long[] input, int offset, int numHops, int numBytes) throws IOException {
+
+    switch (numBytes) {
+    case 1:
+      writeBuffer[0] = (byte) (input[offset + 0] & 255);
+      writeBuffer[1] = (byte) (input[offset + 1] & 255);
+      writeBuffer[2] = (byte) (input[offset + 2] & 255);
+      writeBuffer[3] = (byte) (input[offset + 3] & 255);
+      writeBuffer[4] = (byte) (input[offset + 4] & 255);
+      writeBuffer[5] = (byte) (input[offset + 5] & 255);
+      writeBuffer[6] = (byte) (input[offset + 6] & 255);
+      writeBuffer[7] = (byte) (input[offset + 7] & 255);
+      break;
+    case 2:
+      writeLongBE2(output, input[offset + 0], 0);
+      writeLongBE2(output, input[offset + 1], 2);
+      writeLongBE2(output, input[offset + 2], 4);
+      writeLongBE2(output, input[offset + 3], 6);
+      writeLongBE2(output, input[offset + 4], 8);
+      writeLongBE2(output, input[offset + 5], 10);
+      writeLongBE2(output, input[offset + 6], 12);
+      writeLongBE2(output, input[offset + 7], 14);
+      break;
+    case 3:
+      writeLongBE3(output, input[offset + 0], 0);
+      writeLongBE3(output, input[offset + 1], 3);
+      writeLongBE3(output, input[offset + 2], 6);
+      writeLongBE3(output, input[offset + 3], 9);
+      writeLongBE3(output, input[offset + 4], 12);
+      writeLongBE3(output, input[offset + 5], 15);
+      writeLongBE3(output, input[offset + 6], 18);
+      writeLongBE3(output, input[offset + 7], 21);
+      break;
+    case 4:
+      writeLongBE4(output, input[offset + 0], 0);
+      writeLongBE4(output, input[offset + 1], 4);
+      writeLongBE4(output, input[offset + 2], 8);
+      writeLongBE4(output, input[offset + 3], 12);
+      writeLongBE4(output, input[offset + 4], 16);
+      writeLongBE4(output, input[offset + 5], 20);
+      writeLongBE4(output, input[offset + 6], 24);
+      writeLongBE4(output, input[offset + 7], 28);
+      break;
+    case 5:
+      writeLongBE5(output, input[offset + 0], 0);
+      writeLongBE5(output, input[offset + 1], 5);
+      writeLongBE5(output, input[offset + 2], 10);
+      writeLongBE5(output, input[offset + 3], 15);
+      writeLongBE5(output, input[offset + 4], 20);
+      writeLongBE5(output, input[offset + 5], 25);
+      writeLongBE5(output, input[offset + 6], 30);
+      writeLongBE5(output, input[offset + 7], 35);
+      break;
+    case 6:
+      writeLongBE6(output, input[offset + 0], 0);
+      writeLongBE6(output, input[offset + 1], 6);
+      writeLongBE6(output, input[offset + 2], 12);
+      writeLongBE6(output, input[offset + 3], 18);
+      writeLongBE6(output, input[offset + 4], 24);
+      writeLongBE6(output, input[offset + 5], 30);
+      writeLongBE6(output, input[offset + 6], 36);
+      writeLongBE6(output, input[offset + 7], 42);
+      break;
+    case 7:
+      writeLongBE7(output, input[offset + 0], 0);
+      writeLongBE7(output, input[offset + 1], 7);
+      writeLongBE7(output, input[offset + 2], 14);
+      writeLongBE7(output, input[offset + 3], 21);
+      writeLongBE7(output, input[offset + 4], 28);
+      writeLongBE7(output, input[offset + 5], 35);
+      writeLongBE7(output, input[offset + 6], 42);
+      writeLongBE7(output, input[offset + 7], 49);
+      break;
+    case 8:
+      writeLongBE8(output, input[offset + 0], 0);
+      writeLongBE8(output, input[offset + 1], 8);
+      writeLongBE8(output, input[offset + 2], 16);
+      writeLongBE8(output, input[offset + 3], 24);
+      writeLongBE8(output, input[offset + 4], 32);
+      writeLongBE8(output, input[offset + 5], 40);
+      writeLongBE8(output, input[offset + 6], 48);
+      writeLongBE8(output, input[offset + 7], 56);
+      break;
+      default:
+        break;
+    }
+
+    final int toWrite = numHops * numBytes;
+    output.write(writeBuffer, 0, toWrite);
+  }
+
+  private void writeLongBE2(OutputStream output, long val, int wbOffset) {
+    writeBuffer[wbOffset + 0] =  (byte) (val >>> 8);
+    writeBuffer[wbOffset + 1] =  (byte) (val >>> 0);
+  }
+
+  private void writeLongBE3(OutputStream output, long val, int wbOffset) {
+    writeBuffer[wbOffset + 0] =  (byte) (val >>> 16);
+    writeBuffer[wbOffset + 1] =  (byte) (val >>> 8);
+    writeBuffer[wbOffset + 2] =  (byte) (val >>> 0);
+  }
+
+  private void writeLongBE4(OutputStream output, long val, int wbOffset) {
+    writeBuffer[wbOffset + 0] =  (byte) (val >>> 24);
+    writeBuffer[wbOffset + 1] =  (byte) (val >>> 16);
+    writeBuffer[wbOffset + 2] =  (byte) (val >>> 8);
+    writeBuffer[wbOffset + 3] =  (byte) (val >>> 0);
+  }
+
+  private void writeLongBE5(OutputStream output, long val, int wbOffset) {
+    writeBuffer[wbOffset + 0] =  (byte) (val >>> 32);
+    writeBuffer[wbOffset + 1] =  (byte) (val >>> 24);
+    writeBuffer[wbOffset + 2] =  (byte) (val >>> 16);
+    writeBuffer[wbOffset + 3] =  (byte) (val >>> 8);
+    writeBuffer[wbOffset + 4] =  (byte) (val >>> 0);
+  }
+
+  private void writeLongBE6(OutputStream output, long val, int wbOffset) {
+    writeBuffer[wbOffset + 0] =  (byte) (val >>> 40);
+    writeBuffer[wbOffset + 1] =  (byte) (val >>> 32);
+    writeBuffer[wbOffset + 2] =  (byte) (val >>> 24);
+    writeBuffer[wbOffset + 3] =  (byte) (val >>> 16);
+    writeBuffer[wbOffset + 4] =  (byte) (val >>> 8);
+    writeBuffer[wbOffset + 5] =  (byte) (val >>> 0);
+  }
+
+  private void writeLongBE7(OutputStream output, long val, int wbOffset) {
+    writeBuffer[wbOffset + 0] =  (byte) (val >>> 48);
+    writeBuffer[wbOffset + 1] =  (byte) (val >>> 40);
+    writeBuffer[wbOffset + 2] =  (byte) (val >>> 32);
+    writeBuffer[wbOffset + 3] =  (byte) (val >>> 24);
+    writeBuffer[wbOffset + 4] =  (byte) (val >>> 16);
+    writeBuffer[wbOffset + 5] =  (byte) (val >>> 8);
+    writeBuffer[wbOffset + 6] =  (byte) (val >>> 0);
+  }
+
+  private void writeLongBE8(OutputStream output, long val, int wbOffset) {
+    writeBuffer[wbOffset + 0] =  (byte) (val >>> 56);
+    writeBuffer[wbOffset + 1] =  (byte) (val >>> 48);
+    writeBuffer[wbOffset + 2] =  (byte) (val >>> 40);
+    writeBuffer[wbOffset + 3] =  (byte) (val >>> 32);
+    writeBuffer[wbOffset + 4] =  (byte) (val >>> 24);
+    writeBuffer[wbOffset + 5] =  (byte) (val >>> 16);
+    writeBuffer[wbOffset + 6] =  (byte) (val >>> 8);
+    writeBuffer[wbOffset + 7] =  (byte) (val >>> 0);
+  }
+
   /**
    * Read bitpacked integers from input stream
    * @param buffer - input buffer
@@ -435,11 +863,49 @@ final class SerializationUtils {
    * @param input - input stream
    * @throws IOException
    */
-  static void readInts(long[] buffer, int offset, int len, int bitSize,
+  void readInts(long[] buffer, int offset, int len, int bitSize,
                        InStream input) throws IOException {
     int bitsLeft = 0;
     int current = 0;
 
+    switch (bitSize) {
+    case 1:
+      unrolledUnPack1(buffer, offset, len, input);
+      return;
+    case 2:
+      unrolledUnPack2(buffer, offset, len, input);
+      return;
+    case 4:
+      unrolledUnPack4(buffer, offset, len, input);
+      return;
+    case 8:
+      unrolledUnPack8(buffer, offset, len, input);
+      return;
+    case 16:
+      unrolledUnPack16(buffer, offset, len, input);
+      return;
+    case 24:
+      unrolledUnPack24(buffer, offset, len, input);
+      return;
+    case 32:
+      unrolledUnPack32(buffer, offset, len, input);
+      return;
+    case 40:
+      unrolledUnPack40(buffer, offset, len, input);
+      return;
+    case 48:
+      unrolledUnPack48(buffer, offset, len, input);
+      return;
+    case 56:
+      unrolledUnPack56(buffer, offset, len, input);
+      return;
+    case 64:
+      unrolledUnPack64(buffer, offset, len, input);
+      return;
+    default:
+      break;
+    }
+
     for(int i = offset; i < (offset + len); i++) {
       long result = 0;
       int bitsLeftToRead = bitSize;
@@ -460,4 +926,362 @@ final class SerializationUtils {
       buffer[i] = result;
     }
   }
+
+
+  private void unrolledUnPack1(long[] buffer, int offset, int len,
+      InStream input) throws IOException {
+    final int numHops = 8;
+    final int remainder = len % numHops;
+    final int endOffset = offset + len;
+    final int endUnroll = endOffset - remainder;
+    int val = 0;
+    for (int i = offset; i < endUnroll; i = i + numHops) {
+      val = input.read();
+      buffer[i] = (val >>> 7) & 1;
+      buffer[i + 1] = (val >>> 6) & 1;
+      buffer[i + 2] = (val >>> 5) & 1;
+      buffer[i + 3] = (val >>> 4) & 1;
+      buffer[i + 4] = (val >>> 3) & 1;
+      buffer[i + 5] = (val >>> 2) & 1;
+      buffer[i + 6] = (val >>> 1) & 1;
+      buffer[i + 7] = val & 1;
+    }
+
+    if (remainder > 0) {
+      int startShift = 7;
+      val = input.read();
+      for (int i = endUnroll; i < endOffset; i++) {
+        buffer[i] = (val >>> startShift) & 1;
+        startShift -= 1;
+      }
+    }
+  }
+
+  private void unrolledUnPack2(long[] buffer, int offset, int len,
+      InStream input) throws IOException {
+    final int numHops = 4;
+    final int remainder = len % numHops;
+    final int endOffset = offset + len;
+    final int endUnroll = endOffset - remainder;
+    int val = 0;
+    for (int i = offset; i < endUnroll; i = i + numHops) {
+      val = input.read();
+      buffer[i] = (val >>> 6) & 3;
+      buffer[i + 1] = (val >>> 4) & 3;
+      buffer[i + 2] = (val >>> 2) & 3;
+      buffer[i + 3] = val & 3;
+    }
+
+    if (remainder > 0) {
+      int startShift = 6;
+      val = input.read();
+      for (int i = endUnroll; i < endOffset; i++) {
+        buffer[i] = (val >>> startShift) & 3;
+        startShift -= 2;
+      }
+    }
+  }
+
+  private void unrolledUnPack4(long[] buffer, int offset, int len,
+      InStream input) throws IOException {
+    final int numHops = 2;
+    final int remainder = len % numHops;
+    final int endOffset = offset + len;
+    final int endUnroll = endOffset - remainder;
+    int val = 0;
+    for (int i = offset; i < endUnroll; i = i + numHops) {
+      val = input.read();
+      buffer[i] = (val >>> 4) & 15;
+      buffer[i + 1] = val & 15;
+    }
+
+    if (remainder > 0) {
+      int startShift = 4;
+      val = input.read();
+      for (int i = endUnroll; i < endOffset; i++) {
+        buffer[i] = (val >>> startShift) & 15;
+        startShift -= 4;
+      }
+    }
+  }
+
+  private void unrolledUnPack8(long[] buffer, int offset, int len,
+      InStream input) throws IOException {
+    unrolledUnPackBytes(buffer, offset, len, input, 1);
+  }
+
+  private void unrolledUnPack16(long[] buffer, int offset, int len,
+      InStream input) throws IOException {
+    unrolledUnPackBytes(buffer, offset, len, input, 2);
+  }
+
+  private void unrolledUnPack24(long[] buffer, int offset, int len,
+      InStream input) throws IOException {
+    unrolledUnPackBytes(buffer, offset, len, input, 3);
+  }
+
+  private void unrolledUnPack32(long[] buffer, int offset, int len,
+      InStream input) throws IOException {
+    unrolledUnPackBytes(buffer, offset, len, input, 4);
+  }
+
+  private void unrolledUnPack40(long[] buffer, int offset, int len,
+      InStream input) throws IOException {
+    unrolledUnPackBytes(buffer, offset, len, input, 5);
+  }
+
+  private void unrolledUnPack48(long[] buffer, int offset, int len,
+      InStream input) throws IOException {
+    unrolledUnPackBytes(buffer, offset, len, input, 6);
+  }
+
+  private void unrolledUnPack56(long[] buffer, int offset, int len,
+      InStream input) throws IOException {
+    unrolledUnPackBytes(buffer, offset, len, input, 7);
+  }
+
+  private void unrolledUnPack64(long[] buffer, int offset, int len,
+      InStream input) throws IOException {
+    unrolledUnPackBytes(buffer, offset, len, input, 8);
+  }
+
+  private void unrolledUnPackBytes(long[] buffer, int offset, int len, InStream input, int numBytes)
+      throws IOException {
+    final int numHops = 8;
+    final int remainder = len % numHops;
+    final int endOffset = offset + len;
+    final int endUnroll = endOffset - remainder;
+    int i = offset;
+    for (; i < endUnroll; i = i + numHops) {
+      readLongBE(input, buffer, i, numHops, numBytes);
+    }
+
+    if (remainder > 0) {
+      readRemainingLongs(buffer, i, input, remainder, numBytes);
+    }
+  }
+
+  private void readRemainingLongs(long[] buffer, int offset, InStream input, int remainder,
+      int numBytes) throws IOException {
+    final int toRead = remainder * numBytes;
+    // bulk read to buffer
+    int bytesRead = input.read(readBuffer, 0, toRead);
+    while (bytesRead != toRead) {
+      bytesRead += input.read(readBuffer, bytesRead, toRead - bytesRead);
+    }
+
+    int idx = 0;
+    switch (numBytes) {
+    case 1:
+      while (remainder > 0) {
+        buffer[offset++] = readBuffer[idx] & 255;
+        remainder--;
+        idx++;
+      }
+      break;
+    case 2:
+      while (remainder > 0) {
+        buffer[offset++] = readLongBE2(input, idx * 2);
+        remainder--;
+        idx++;
+      }
+      break;
+    case 3:
+      while (remainder > 0) {
+        buffer[offset++] = readLongBE3(input, idx * 3);
+        remainder--;
+        idx++;
+      }
+      break;
+    case 4:
+      while (remainder > 0) {
+        buffer[offset++] = readLongBE4(input, idx * 4);
+        remainder--;
+        idx++;
+      }
+      break;
+    case 5:
+      while (remainder > 0) {
+        buffer[offset++] = readLongBE5(input, idx * 5);
+        remainder--;
+        idx++;
+      }
+      break;
+    case 6:
+      while (remainder > 0) {
+        buffer[offset++] = readLongBE6(input, idx * 6);
+        remainder--;
+        idx++;
+      }
+      break;
+    case 7:
+      while (remainder > 0) {
+        buffer[offset++] = readLongBE7(input, idx * 7);
+        remainder--;
+        idx++;
+      }
+      break;
+    case 8:
+      while (remainder > 0) {
+        buffer[offset++] = readLongBE8(input, idx * 8);
+        remainder--;
+        idx++;
+      }
+      break;
+    default:
+      break;
+    }
+  }
+
+  private void readLongBE(InStream in, long[] buffer, int start, int numHops, int numBytes)
+      throws IOException {
+    final int toRead = numHops * numBytes;
+    // bulk read to buffer
+    int bytesRead = in.read(readBuffer, 0, toRead);
+    while (bytesRead != toRead) {
+      bytesRead += in.read(readBuffer, bytesRead, toRead - bytesRead);
+    }
+
+    switch (numBytes) {
+    case 1:
+      buffer[start + 0] = readBuffer[0] & 255;
+      buffer[start + 1] = readBuffer[1] & 255;
+      buffer[start + 2] = readBuffer[2] & 255;
+      buffer[start + 3] = readBuffer[3] & 255;
+      buffer[start + 4] = readBuffer[4] & 255;
+      buffer[start + 5] = readBuffer[5] & 255;
+      buffer[start + 6] = readBuffer[6] & 255;
+      buffer[start + 7] = readBuffer[7] & 255;
+      break;
+    case 2:
+      buffer[start + 0] = readLongBE2(in, 0);
+      buffer[start + 1] = readLongBE2(in, 2);
+      buffer[start + 2] = readLongBE2(in, 4);
+      buffer[start + 3] = readLongBE2(in, 6);
+      buffer[start + 4] = readLongBE2(in, 8);
+      buffer[start + 5] = readLongBE2(in, 10);
+      buffer[start + 6] = readLongBE2(in, 12);
+      buffer[start + 7] = readLongBE2(in, 14);
+      break;
+    case 3:
+      buffer[start + 0] = readLongBE3(in, 0);
+      buffer[start + 1] = readLongBE3(in, 3);
+      buffer[start + 2] = readLongBE3(in, 6);
+      buffer[start + 3] = readLongBE3(in, 9);
+      buffer[start + 4] = readLongBE3(in, 12);
+      buffer[start + 5] = readLongBE3(in, 15);
+      buffer[start + 6] = readLongBE3(in, 18);
+      buffer[start + 7] = readLongBE3(in, 21);
+      break;
+    case 4:
+      buffer[start + 0] = readLongBE4(in, 0);
+      buffer[start + 1] = readLongBE4(in, 4);
+      buffer[start + 2] = readLongBE4(in, 8);
+      buffer[start + 3] = readLongBE4(in, 12);
+      buffer[start + 4] = readLongBE4(in, 16);
+      buffer[start + 5] = readLongBE4(in, 20);
+      buffer[start + 6] = readLongBE4(in, 24);
+      buffer[start + 7] = readLongBE4(in, 28);
+      break;
+    case 5:
+      buffer[start + 0] = readLongBE5(in, 0);
+      buffer[start + 1] = readLongBE5(in, 5);
+      buffer[start + 2] = readLongBE5(in, 10);
+      buffer[start + 3] = readLongBE5(in, 15);
+      buffer[start + 4] = readLongBE5(in, 20);
+      buffer[start + 5] = readLongBE5(in, 25);
+      buffer[start + 6] = readLongBE5(in, 30);
+      buffer[start + 7] = readLongBE5(in, 35);
+      break;
+    case 6:
+      buffer[start + 0] = readLongBE6(in, 0);
+      buffer[start + 1] = readLongBE6(in, 6);
+      buffer[start + 2] = readLongBE6(in, 12);
+      buffer[start + 3] = readLongBE6(in, 18);
+      buffer[start + 4] = readLongBE6(in, 24);
+      buffer[start + 5] = readLongBE6(in, 30);
+      buffer[start + 6] = readLongBE6(in, 36);
+      buffer[start + 7] = readLongBE6(in, 42);
+      break;
+    case 7:
+      buffer[start + 0] = readLongBE7(in, 0);
+      buffer[start + 1] = readLongBE7(in, 7);
+      buffer[start + 2] = readLongBE7(in, 14);
+      buffer[start + 3] = readLongBE7(in, 21);
+      buffer[start + 4] = readLongBE7(in, 28);
+      buffer[start + 5] = readLongBE7(in, 35);
+      buffer[start + 6] = readLongBE7(in, 42);
+      buffer[start + 7] = readLongBE7(in, 49);
+      break;
+    case 8:
+      buffer[start + 0] = readLongBE8(in, 0);
+      buffer[start + 1] = readLongBE8(in, 8);
+      buffer[start + 2] = readLongBE8(in, 16);
+      buffer[start + 3] = readLongBE8(in, 24);
+      buffer[start + 4] = readLongBE8(in, 32);
+      buffer[start + 5] = readLongBE8(in, 40);
+      buffer[start + 6] = readLongBE8(in, 48);
+      buffer[start + 7] = readLongBE8(in, 56);
+      break;
+    default:
+      break;
+    }
+  }
+
+  private long readLongBE2(InStream in, int rbOffset) {
+    return (((readBuffer[rbOffset] & 255) << 8)
+        + ((readBuffer[rbOffset + 1] & 255) << 0));
+  }
+
+  private long readLongBE3(InStream in, int rbOffset) {
+    return (((readBuffer[rbOffset] & 255) << 16)
+        + ((readBuffer[rbOffset + 1] & 255) << 8)
+        + ((readBuffer[rbOffset + 2] & 255) << 0));
+  }
+
+  private long readLongBE4(InStream in, int rbOffset) {
+    return (((long) (readBuffer[rbOffset] & 255) << 24)
+        + ((readBuffer[rbOffset + 1] & 255) << 16)
+        + ((readBuffer[rbOffset + 2] & 255) << 8)
+        + ((readBuffer[rbOffset + 3] & 255) << 0));
+  }
+
+  private long readLongBE5(InStream in, int rbOffset) {
+    return (((long) (readBuffer[rbOffset] & 255) << 32)
+        + ((long) (readBuffer[rbOffset + 1] & 255) << 24)
+        + ((readBuffer[rbOffset + 2] & 255) << 16)
+        + ((readBuffer[rbOffset + 3] & 255) << 8)
+        + ((readBuffer[rbOffset + 4] & 255) << 0));
+  }
+
+  private long readLongBE6(InStream in, int rbOffset) {
+    return (((long) (readBuffer[rbOffset] & 255) << 40)
+        + ((long) (readBuffer[rbOffset + 1] & 255) << 32)
+        + ((long) (readBuffer[rbOffset + 2] & 255) << 24)
+        + ((readBuffer[rbOffset + 3] & 255) << 16)
+        + ((readBuffer[rbOffset + 4] & 255) << 8)
+        + ((readBuffer[rbOffset + 5] & 255) << 0));
+  }
+
+  private long readLongBE7(InStream in, int rbOffset) {
+    return (((long) (readBuffer[rbOffset] & 255) << 48)
+        + ((long) (readBuffer[rbOffset + 1] & 255) << 40)
+        + ((long) (readBuffer[rbOffset + 2] & 255) << 32)
+        + ((long) (readBuffer[rbOffset + 3] & 255) << 24)
+        + ((readBuffer[rbOffset + 4] & 255) << 16)
+        + ((readBuffer[rbOffset + 5] & 255) << 8)
+        + ((readBuffer[rbOffset + 6] & 255) << 0));
+  }
+
+  private long readLongBE8(InStream in, int rbOffset) {
+    return (((long) (readBuffer[rbOffset] & 255) << 56)
+        + ((long) (readBuffer[rbOffset + 1] & 255) << 48)
+        + ((long) (readBuffer[rbOffset + 2] & 255) << 40)
+        + ((long) (readBuffer[rbOffset + 3] & 255) << 32)
+        + ((long) (readBuffer[rbOffset + 4] & 255) << 24)
+        + ((readBuffer[rbOffset + 5] & 255) << 16)
+        + ((readBuffer[rbOffset + 6] & 255) << 8)
+        + ((readBuffer[rbOffset + 7] & 255) << 0));
+  }
+
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1603240&r1=1603239&r2=1603240&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Tue Jun 17 17:59:25 2014
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.TreeMap;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -36,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile.EncodingStrategy;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.StripeStatistics;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
@@ -134,6 +136,7 @@ class WriterImpl implements Writer, Memo
   private final Configuration conf;
   private final OrcFile.WriterCallback callback;
   private final OrcFile.WriterContext callbackContext;
+  private final OrcFile.EncodingStrategy encodingStrategy;
 
   WriterImpl(FileSystem fs,
              Path path,
@@ -146,7 +149,8 @@ class WriterImpl implements Writer, Memo
              MemoryManager memoryManager,
              boolean addBlockPadding,
              OrcFile.Version version,
-             OrcFile.WriterCallback callback) throws IOException {
+             OrcFile.WriterCallback callback,
+             OrcFile.EncodingStrategy encodingStrategy) throws IOException {
     this.fs = fs;
     this.path = path;
     this.conf = conf;
@@ -164,6 +168,7 @@ class WriterImpl implements Writer, Memo
     }
     this.stripeSize = stripeSize;
     this.version = version;
+    this.encodingStrategy = encodingStrategy;
     this.addBlockPadding = addBlockPadding;
     // pick large block size to minimize block over or under hangs
     this.blockSize = Math.min(MAX_BLOCK_SIZE, 2 * stripeSize);
@@ -404,6 +409,14 @@ class WriterImpl implements Writer, Memo
     }
 
     /**
+     * Get the encoding strategy to use.
+     * @return encoding strategy
+     */
+    public EncodingStrategy getEncodingStrategy() {
+      return encodingStrategy;
+    }
+
+    /**
      * Get the writer's configuration.
      * @return configuration
      */
@@ -497,9 +510,14 @@ class WriterImpl implements Writer, Memo
     }
 
     IntegerWriter createIntegerWriter(PositionedOutputStream output,
-                                      boolean signed, boolean isDirectV2) {
+                                      boolean signed, boolean isDirectV2,
+                                      StreamFactory writer) {
       if (isDirectV2) {
-        return new RunLengthIntegerWriterV2(output, signed);
+        boolean alignedBitpacking = false;
+        if (writer.getEncodingStrategy().equals(EncodingStrategy.SPEED)) {
+          alignedBitpacking = true;
+        }
+        return new RunLengthIntegerWriterV2(output, signed, alignedBitpacking);
       } else {
         return new RunLengthIntegerWriter(output, signed);
       }
@@ -744,7 +762,7 @@ class WriterImpl implements Writer, Memo
       PositionedOutputStream out = writer.createStream(id,
           OrcProto.Stream.Kind.DATA);
       this.isDirectV2 = isNewWriteFormat(writer);
-      this.writer = createIntegerWriter(out, true, isDirectV2);
+      this.writer = createIntegerWriter(out, true, isDirectV2, writer);
       if (inspector instanceof IntObjectInspector) {
         intInspector = (IntObjectInspector) inspector;
         shortInspector = null;
@@ -806,6 +824,7 @@ class WriterImpl implements Writer, Memo
 
   private static class FloatTreeWriter extends TreeWriter {
     private final PositionedOutputStream stream;
+    private final SerializationUtils utils;
 
     FloatTreeWriter(int columnId,
                       ObjectInspector inspector,
@@ -814,6 +833,7 @@ class WriterImpl implements Writer, Memo
       super(columnId, inspector, writer, nullable);
       this.stream = writer.createStream(id,
           OrcProto.Stream.Kind.DATA);
+      this.utils = new SerializationUtils();
       recordPosition(rowIndexPosition);
     }
 
@@ -823,7 +843,7 @@ class WriterImpl implements Writer, Memo
       if (obj != null) {
         float val = ((FloatObjectInspector) inspector).get(obj);
         indexStatistics.updateDouble(val);
-        SerializationUtils.writeFloat(stream, val);
+        utils.writeFloat(stream, val);
       }
     }
 
@@ -844,6 +864,7 @@ class WriterImpl implements Writer, Memo
 
   private static class DoubleTreeWriter extends TreeWriter {
     private final PositionedOutputStream stream;
+    private final SerializationUtils utils;
 
     DoubleTreeWriter(int columnId,
                     ObjectInspector inspector,
@@ -852,6 +873,7 @@ class WriterImpl implements Writer, Memo
       super(columnId, inspector, writer, nullable);
       this.stream = writer.createStream(id,
           OrcProto.Stream.Kind.DATA);
+      this.utils = new SerializationUtils();
       recordPosition(rowIndexPosition);
     }
 
@@ -861,7 +883,7 @@ class WriterImpl implements Writer, Memo
       if (obj != null) {
         double val = ((DoubleObjectInspector) inspector).get(obj);
         indexStatistics.updateDouble(val);
-        SerializationUtils.writeDouble(stream, val);
+        utils.writeDouble(stream, val);
       }
     }
 
@@ -909,15 +931,15 @@ class WriterImpl implements Writer, Memo
       stringOutput = writer.createStream(id,
           OrcProto.Stream.Kind.DICTIONARY_DATA);
       lengthOutput = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
+          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
       rowOutput = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.DATA), false, isDirectV2);
+          OrcProto.Stream.Kind.DATA), false, isDirectV2, writer);
       recordPosition(rowIndexPosition);
       rowIndexValueCount.add(0L);
       buildIndex = writer.buildIndex();
       directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA);
       directLengthOutput = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
+          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
       dictionaryKeySizeThreshold = writer.getConfiguration().getFloat(
         HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.varname,
         HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.
@@ -1129,7 +1151,7 @@ class WriterImpl implements Writer, Memo
           OrcProto.Stream.Kind.DATA);
       this.isDirectV2 = isNewWriteFormat(writer);
       this.length = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
+          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
       recordPosition(rowIndexPosition);
     }
 
@@ -1188,9 +1210,9 @@ class WriterImpl implements Writer, Memo
       super(columnId, inspector, writer, nullable);
       this.isDirectV2 = isNewWriteFormat(writer);
       this.seconds = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.DATA), true, isDirectV2);
+          OrcProto.Stream.Kind.DATA), true, isDirectV2, writer);
       this.nanos = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.SECONDARY), false, isDirectV2);
+          OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer);
       recordPosition(rowIndexPosition);
     }
 
@@ -1261,7 +1283,7 @@ class WriterImpl implements Writer, Memo
       PositionedOutputStream out = writer.createStream(id,
           OrcProto.Stream.Kind.DATA);
       this.isDirectV2 = isNewWriteFormat(writer);
-      this.writer = createIntegerWriter(out, true, isDirectV2);
+      this.writer = createIntegerWriter(out, true, isDirectV2, writer);
       recordPosition(rowIndexPosition);
     }
 
@@ -1314,7 +1336,7 @@ class WriterImpl implements Writer, Memo
       this.isDirectV2 = isNewWriteFormat(writer);
       valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
       this.scaleStream = createIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.SECONDARY), true, isDirectV2);
+          OrcProto.Stream.Kind.SECONDARY), true, isDirectV2, writer);
       recordPosition(rowIndexPosition);
     }
 
@@ -1419,7 +1441,7 @@ class WriterImpl implements Writer, Memo
         createTreeWriter(listObjectInspector.getListElementObjectInspector(),
           writer, true);
       lengths = createIntegerWriter(writer.createStream(columnId,
-          OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
+          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
       recordPosition(rowIndexPosition);
     }
 
@@ -1481,7 +1503,7 @@ class WriterImpl implements Writer, Memo
       childrenWriters[1] =
         createTreeWriter(insp.getMapValueObjectInspector(), writer, true);
       lengths = createIntegerWriter(writer.createStream(columnId,
-          OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
+          OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
       recordPosition(rowIndexPosition);
     }
 

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java?rev=1603240&r1=1603239&r2=1603240&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java Tue Jun 17 17:59:25 2014
@@ -20,24 +20,54 @@ package org.apache.hadoop.hive.ql.io.orc
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.List;
 import java.util.Random;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
 
+import com.google.common.collect.Lists;
 import com.google.common.primitives.Longs;
 
 public class TestBitPack {
 
   private static final int SIZE = 100;
   private static Random rand = new Random(100);
+  Path workDir = new Path(System.getProperty("test.tmp.dir", "target" + File.separator + "test"
+      + File.separator + "tmp"));
+
+  Configuration conf;
+  FileSystem fs;
+  Path testFilePath;
+
+  @Rule
+  public TestName testCaseName = new TestName();
+
+  @Before
+  public void openFileSystem() throws Exception {
+    conf = new Configuration();
+    fs = FileSystem.getLocal(conf);
+    testFilePath = new Path(workDir, "TestOrcFile." + testCaseName.getMethodName() + ".orc");
+    fs.delete(testFilePath, false);
+  }
 
   private long[] deltaEncode(long[] inp) {
     long[] output = new long[inp.length];
-    for(int i = 0; i < inp.length; i++) {
-      output[i] = SerializationUtils.zigzagEncode(inp[i]);
+    SerializationUtils utils = new SerializationUtils();
+    for (int i = 0; i < inp.length; i++) {
+      output[i] = utils.zigzagEncode(inp[i]);
     }
     return output;
   }
@@ -53,7 +83,7 @@ public class TestBitPack {
 
   private void runTest(int numBits) throws IOException {
     long[] inp = new long[SIZE];
-    for(int i = 0; i < SIZE; i++) {
+    for (int i = 0; i < SIZE; i++) {
       long val = 0;
       if (numBits <= 32) {
         if (numBits == 1) {
@@ -73,24 +103,20 @@ public class TestBitPack {
     long minInput = Collections.min(Longs.asList(deltaEncoded));
     long maxInput = Collections.max(Longs.asList(deltaEncoded));
     long rangeInput = maxInput - minInput;
-    int fixedWidth = SerializationUtils.findClosestNumBits(rangeInput);
+    SerializationUtils utils = new SerializationUtils();
+    int fixedWidth = utils.findClosestNumBits(rangeInput);
     TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
     OutStream output = new OutStream("test", SIZE, null, collect);
-    SerializationUtils.writeInts(deltaEncoded, 0, deltaEncoded.length,
-        fixedWidth, output);
+    utils.writeInts(deltaEncoded, 0, deltaEncoded.length, fixedWidth, output);
     output.flush();
     ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
     long[] buff = new long[SIZE];
-    SerializationUtils.readInts(buff, 0, SIZE, fixedWidth,
-                                InStream.create("test",
-                                                new ByteBuffer[]{inBuf},
-                                                new long[]{0},
-                                                inBuf.remaining(),
-                                                null, SIZE));
-    for(int i = 0; i < SIZE; i++) {
-      buff[i] = SerializationUtils.zigzagDecode(buff[i]);
+    utils.readInts(buff, 0, SIZE, fixedWidth, InStream.create("test", new ByteBuffer[] { inBuf },
+        new long[] { 0 }, inBuf.remaining(), null, SIZE));
+    for (int i = 0; i < SIZE; i++) {
+      buff[i] = utils.zigzagDecode(buff[i]);
     }
     assertEquals(numBits, fixedWidth);
     assertArrayEquals(inp, buff);
@@ -255,4 +281,36 @@ public class TestBitPack {
   public void test64BitPacking64Bit() throws IOException {
     runTest(64);
   }
+
+  @Test
+  public void testBitPack64Large() throws Exception {
+    ObjectInspector inspector;
+    synchronized (TestOrcFile.class) {
+      inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class,
+          ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+
+    int size = 1080832;
+    long[] inp = new long[size];
+    Random rand = new Random(1234);
+    for (int i = 0; i < size; i++) {
+      inp[i] = rand.nextLong();
+    }
+    List<Long> input = Lists.newArrayList(Longs.asList(inp));
+
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf).inspector(inspector).compress(CompressionKind.ZLIB));
+    for (Long l : input) {
+      writer.addRow(l);
+    }
+    writer.close();
+
+    Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs));
+    RecordReader rows = reader.rows();
+    int idx = 0;
+    while (rows.hasNext()) {
+      Object row = rows.next(null);
+      assertEquals(input.get(idx++).longValue(), ((LongWritable) row).get());
+    }
+  }
 }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java?rev=1603240&r1=1603239&r2=1603240&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java Tue Jun 17 17:59:25 2014
@@ -90,6 +90,7 @@ public class TestFileDump {
       inspector = ObjectInspectorFactory.getReflectionObjectInspector
           (MyRecord.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
     }
+    conf.set(HiveConf.ConfVars.HIVE_ORC_ENCODING_STRATEGY.varname, "COMPRESSION");
     Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
         100000, CompressionKind.ZLIB, 10000, 10000);
     Random r1 = new Random(1);
@@ -134,6 +135,7 @@ public class TestFileDump {
           (MyRecord.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
     }
     Configuration conf = new Configuration();
+    conf.set(HiveConf.ConfVars.HIVE_ORC_ENCODING_STRATEGY.varname, "COMPRESSION");
     conf.setFloat(HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.varname, 0.49f);
     Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
         100000, CompressionKind.ZLIB, 10000, 10000);