You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2013/08/12 17:03:31 UTC

svn commit: r1513155 [2/3] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/ ql/...

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java?rev=1513155&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java Mon Aug 12 15:03:30 2013
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType;
+
+/**
+ * A reader that reads a sequence of light weight compressed integers. Refer
+ * {@link RunLengthIntegerWriterV2} for description of various lightweight
+ * compression techniques.
+ */
+class RunLengthIntegerReaderV2 implements IntegerReader {
+  private final InStream input;
+  private final boolean signed;
+  private final long[] literals = new long[RunLengthIntegerWriterV2.MAX_SCOPE];
+  private int numLiterals = 0;
+  private int used = 0;
+
+  RunLengthIntegerReaderV2(InStream input, boolean signed) throws IOException {
+    this.input = input;
+    this.signed = signed;
+  }
+
+  private void readValues() throws IOException {
+    // read the first 2 bits and determine the encoding type
+    int firstByte = input.read();
+    if (firstByte < 0) {
+      throw new EOFException("Read past end of RLE integer from " + input);
+    } else {
+      int enc = (firstByte >>> 6) & 0x03;
+      if (EncodingType.SHORT_REPEAT.ordinal() == enc) {
+        readShortRepeatValues(firstByte);
+      } else if (EncodingType.DIRECT.ordinal() == enc) {
+        readDirectValues(firstByte);
+      } else if (EncodingType.PATCHED_BASE.ordinal() == enc) {
+        readPatchedBaseValues(firstByte);
+      } else {
+        readDeltaValues(firstByte);
+      }
+    }
+  }
+
+  private void readDeltaValues(int firstByte) throws IOException {
+
+    // extract the number of fixed bits
+    int fb = (firstByte >>> 1) & 0x1f;
+    if (fb != 0) {
+      fb = SerializationUtils.decodeBitWidth(fb);
+    }
+
+    // extract the blob run length
+    int len = (firstByte & 0x01) << 8;
+    len |= input.read();
+
+    // read the first value stored as vint
+    long firstVal = 0;
+    if (signed) {
+      firstVal = SerializationUtils.readVslong(input);
+    } else {
+      firstVal = SerializationUtils.readVulong(input);
+    }
+
+    // store first value to result buffer
+    long prevVal = firstVal;
+    literals[numLiterals++] = firstVal;
+
+    // if fixed bits is 0 then all values have fixed delta
+    if (fb == 0) {
+      // read the fixed delta value stored as vint (deltas can be negative even
+      // if all number are positive)
+      long fd = SerializationUtils.readVslong(input);
+
+      // add fixed deltas to adjacent values
+      for(int i = 0; i < len; i++) {
+        literals[numLiterals++] = literals[numLiterals - 2] + fd;
+      }
+    } else {
+      long deltaBase = SerializationUtils.readVslong(input);
+      // add delta base and first value
+      literals[numLiterals++] = firstVal + deltaBase;
+      prevVal = literals[numLiterals - 1];
+      len -= 1;
+
+      // write the unpacked values, add it to previous value and store final
+      // value to result buffer. if the delta base value is negative then it
+      // is a decreasing sequence else an increasing sequence
+      SerializationUtils.readInts(literals, numLiterals, len, fb, input);
+      while (len > 0) {
+        if (deltaBase < 0) {
+          literals[numLiterals] = prevVal - literals[numLiterals];
+        } else {
+          literals[numLiterals] = prevVal + literals[numLiterals];
+        }
+        prevVal = literals[numLiterals];
+        len--;
+        numLiterals++;
+      }
+    }
+  }
+
+  private void readPatchedBaseValues(int firstByte) throws IOException {
+
+    // extract the number of fixed bits
+    int fbo = (firstByte >>> 1) & 0x1f;
+    int fb = SerializationUtils.decodeBitWidth(fbo);
+
+    // extract the run length of data blob
+    int len = (firstByte & 0x01) << 8;
+    len |= input.read();
+    // runs are always one off
+    len += 1;
+
+    // extract the number of bytes occupied by base
+    int thirdByte = input.read();
+    int bw = (thirdByte >>> 5) & 0x07;
+    // base width is one off
+    bw += 1;
+
+    // extract patch width
+    int pwo = thirdByte & 0x1f;
+    int pw = SerializationUtils.decodeBitWidth(pwo);
+
+    // read fourth byte and extract patch gap width
+    int fourthByte = input.read();
+    int pgw = (fourthByte >>> 5) & 0x07;
+    // patch gap width is one off
+    pgw += 1;
+
+    // extract the length of the patch list
+    int pl = fourthByte & 0x1f;
+
+    // read the next base width number of bytes to extract base value
+    long base = SerializationUtils.bytesToLongBE(input, bw);
+    long mask = (1L << ((bw * 8) - 1));
+    // if MSB of base value is 1 then base is negative value else positive
+    if ((base & mask) != 0) {
+      base = base & ~mask;
+      base = -base;
+    }
+
+    // unpack the data blob
+    long[] unpacked = new long[len];
+    SerializationUtils.readInts(unpacked, 0, len, fb, input);
+
+    // unpack the patch blob
+    long[] unpackedPatch = new long[pl];
+    SerializationUtils.readInts(unpackedPatch, 0, pl, pw + pgw, input);
+
+    // apply the patch directly when decoding the packed data
+    int patchIdx = 0;
+    long currGap = 0;
+    long currPatch = 0;
+    currGap = unpackedPatch[patchIdx] >>> pw;
+    currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+    long actualGap = 0;
+
+    // special case: gap is >255 then patch value will be 0.
+    // if gap is <=255 then patch value cannot be 0
+    while (currGap == 255 && currPatch == 0) {
+      actualGap += 255;
+      patchIdx++;
+      currGap = unpackedPatch[patchIdx] >>> pw;
+      currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+    }
+    // add the left over gap
+    actualGap += currGap;
+
+    // unpack data blob, patch it (if required), add base to get final result
+    for(int i = 0; i < unpacked.length; i++) {
+      if (i == actualGap) {
+        // extract the patch value
+        long patchedVal = unpacked[i] | (currPatch << fb);
+
+        // add base to patched value
+        literals[numLiterals++] = base + patchedVal;
+
+        // increment the patch to point to next entry in patch list
+        patchIdx++;
+
+        if (patchIdx < pl) {
+          // read the next gap and patch
+          currGap = unpackedPatch[patchIdx] >>> pw;
+          currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+          actualGap = 0;
+
+          // special case: gap is >255 then patch will be 0. if gap is
+          // <=255 then patch cannot be 0
+          while (currGap == 255 && currPatch == 0) {
+            actualGap += 255;
+            patchIdx++;
+            currGap = unpackedPatch[patchIdx] >>> pw;
+            currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+          }
+          // add the left over gap
+          actualGap += currGap;
+
+          // next gap is relative to the current gap
+          actualGap += i;
+        }
+      } else {
+        // no patching required. add base to unpacked value to get final value
+        literals[numLiterals++] = base + unpacked[i];
+      }
+    }
+
+  }
+
+  private void readDirectValues(int firstByte) throws IOException {
+
+    // extract the number of fixed bits
+    int fbo = (firstByte >>> 1) & 0x1f;
+    int fb = SerializationUtils.decodeBitWidth(fbo);
+
+    // extract the run length
+    int len = (firstByte & 0x01) << 8;
+    len |= input.read();
+    // runs are one off
+    len += 1;
+
+    // write the unpacked values and zigzag decode to result buffer
+    SerializationUtils.readInts(literals, numLiterals, len, fb, input);
+    if (signed) {
+      for(int i = 0; i < len; i++) {
+        literals[numLiterals] = SerializationUtils
+            .zigzagDecode(literals[numLiterals]);
+        numLiterals++;
+      }
+    } else {
+      numLiterals += len;
+    }
+  }
+
+  private void readShortRepeatValues(int firstByte) throws IOException {
+
+    // read the number of bytes occupied by the value
+    int size = (firstByte >>> 3) & 0x07;
+    // #bytes are one off
+    size += 1;
+
+    // read the run length
+    int len = firstByte & 0x07;
+    // run lengths values are stored only after MIN_REPEAT value is met
+    len += RunLengthIntegerWriterV2.MIN_REPEAT;
+
+    // read the repeated value which is store using fixed bytes
+    long val = SerializationUtils.bytesToLongBE(input, size);
+
+    if (signed) {
+      val = SerializationUtils.zigzagDecode(val);
+    }
+
+    // repeat the value for length times
+    for(int i = 0; i < len; i++) {
+      literals[numLiterals++] = val;
+    }
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    return used != numLiterals || input.available() > 0;
+  }
+
+  @Override
+  public long next() throws IOException {
+    long result;
+    if (used == numLiterals) {
+      numLiterals = 0;
+      used = 0;
+      readValues();
+    }
+    result = literals[used++];
+    return result;
+  }
+
+  @Override
+  public void seek(PositionProvider index) throws IOException {
+    input.seek(index);
+    int consumed = (int) index.getNext();
+    if (consumed != 0) {
+      // a loop is required for cases where we break the run into two
+      // parts
+      while (consumed > 0) {
+        numLiterals = 0;
+        readValues();
+        used = consumed;
+        consumed -= numLiterals;
+      }
+    } else {
+      used = 0;
+      numLiterals = 0;
+    }
+  }
+
+  @Override
+  public void skip(long numValues) throws IOException {
+    while (numValues > 0) {
+      if (used == numLiterals) {
+        numLiterals = 0;
+        used = 0;
+        readValues();
+      }
+      long consume = Math.min(numValues, numLiterals - used);
+      used += consume;
+      numValues -= consume;
+    }
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java?rev=1513155&r1=1513154&r2=1513155&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java Mon Aug 12 15:03:30 2013
@@ -25,7 +25,7 @@ import java.io.IOException;
  * repetition is offset by a delta. If the control byte is -1 to -128, 1 to 128
  * literal vint values follow.
  */
-class RunLengthIntegerWriter {
+class RunLengthIntegerWriter implements IntegerWriter {
   static final int MIN_REPEAT_SIZE = 3;
   static final int MAX_DELTA = 127;
   static final int MIN_DELTA = -128;
@@ -71,12 +71,14 @@ class RunLengthIntegerWriter {
     }
   }
 
-  void flush() throws IOException {
+  @Override
+  public void flush() throws IOException {
     writeValues();
     output.flush();
   }
 
-  void write(long value) throws IOException {
+  @Override
+  public void write(long value) throws IOException {
     if (numLiterals == 0) {
       literals[numLiterals++] = value;
       tailRunLength = 1;
@@ -130,8 +132,10 @@ class RunLengthIntegerWriter {
     }
   }
 
-  void getPosition(PositionRecorder recorder) throws IOException {
+  @Override
+  public void getPosition(PositionRecorder recorder) throws IOException {
     output.getPosition(recorder);
     recorder.addPosition(numLiterals);
   }
+
 }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java?rev=1513155&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java Mon Aug 12 15:03:30 2013
@@ -0,0 +1,817 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+
+/**
+ * A writer that performs light weight compression over sequence of integers.
+ * <p>
+ * There are four types of lightweight integer compression
+ * <ul>
+ * <li>SHORT_REPEAT</li>
+ * <li>DIRECT</li>
+ * <li>PATCHED_BASE</li>
+ * <li>DELTA</li>
+ * </ul>
+ * </p>
+ * The description and format for these types are as below:
+ * <p>
+ * <b>SHORT_REPEAT:</b> Used for short repeated integer sequences.
+ * <ul>
+ * <li>1 byte header
+ * <ul>
+ * <li>2 bits for encoding type</li>
+ * <li>3 bits for bytes required for repeating value</li>
+ * <li>3 bits for repeat count (MIN_REPEAT + run length)</li>
+ * </ul>
+ * </li>
+ * <li>Blob - repeat value (fixed bytes)</li>
+ * </ul>
+ * </p>
+ * <p>
+ * <b>DIRECT:</b> Used for random integer sequences whose number of bit
+ * requirement doesn't vary a lot.
+ * <ul>
+ * <li>2 bytes header
+ * <ul>
+ * 1st byte
+ * <li>2 bits for encoding type</li>
+ * <li>5 bits for fixed bit width of values in blob</li>
+ * <li>1 bit for storing MSB of run length</li>
+ * </ul>
+ * <ul>
+ * 2nd byte
+ * <li>8 bits for lower run length bits</li>
+ * </ul>
+ * </li>
+ * <li>Blob - stores the direct values using fixed bit width. The length of the
+ * data blob is (fixed width * run length) bits long</li>
+ * </ul>
+ * </p>
+ * <p>
+ * <b>PATCHED_BASE:</b> Used for random integer sequences whose number of bit
+ * requirement varies beyond a threshold.
+ * <ul>
+ * <li>4 bytes header
+ * <ul>
+ * 1st byte
+ * <li>2 bits for encoding type</li>
+ * <li>5 bits for fixed bit width of values in blob</li>
+ * <li>1 bit for storing MSB of run length</li>
+ * </ul>
+ * <ul>
+ * 2nd byte
+ * <li>8 bits for lower run length bits</li>
+ * </ul>
+ * <ul>
+ * 3rd byte
+ * <li>3 bits for bytes required to encode base value</li>
+ * <li>5 bits for patch width</li>
+ * </ul>
+ * <ul>
+ * 4th byte
+ * <li>3 bits for patch gap width</li>
+ * <li>5 bits for patch length</li>
+ * </ul>
+ * </li>
+ * <li>Base value - Stored using fixed number of bytes. If MSB is set, base
+ * value is negative else positive. Length of base value is (base width * 8)
+ * bits.</li>
+ * <li>Data blob - Base reduced values as stored using fixed bit width. Length
+ * of data blob is (fixed width * run length) bits.</li>
+ * <li>Patch blob - Patch blob is a list of gap and patch value. Each entry in
+ * the patch list is (patch width + patch gap width) bits long. Gap between the
+ * subsequent elements to be patched are stored in upper part of entry whereas
+ * patch values are stored in lower part of entry. Length of patch blob is
+ * ((patch width + patch gap width) * patch length) bits.</li>
+ * </ul>
+ * </p>
+ * <p>
+ * <b>DELTA</b> Used for monotonically increasing or decreasing sequences,
+ * sequences with fixed delta values or long repeated sequences.
+ * <ul>
+ * <li>2 bytes header
+ * <ul>
+ * 1st byte
+ * <li>2 bits for encoding type</li>
+ * <li>5 bits for fixed bit width of values in blob</li>
+ * <li>1 bit for storing MSB of run length</li>
+ * </ul>
+ * <ul>
+ * 2nd byte
+ * <li>8 bits for lower run length bits</li>
+ * </ul>
+ * </li>
+ * <li>Base value - encoded as varint</li>
+ * <li>Delta base - encoded as varint</li>
+ * <li>Delta blob - only positive values. monotonicity and orderness are decided
+ * based on the sign of the base value and delta base</li>
+ * </ul>
+ * </p>
+ */
+class RunLengthIntegerWriterV2 implements IntegerWriter {
+
+  public enum EncodingType {
+    SHORT_REPEAT, DIRECT, PATCHED_BASE, DELTA
+  }
+
+  static final int MAX_SCOPE = 512;
+  static final int MIN_REPEAT = 3;
+  private static final int MAX_SHORT_REPEAT_LENGTH = 10;
+  private long prevDelta = 0;
+  private int fixedRunLength = 0;
+  private int variableRunLength = 0;
+  private final long[] literals = new long[MAX_SCOPE];
+  private final PositionedOutputStream output;
+  private final boolean signed;
+  private EncodingType encoding;
+  private int numLiterals;
+  private long[] zigzagLiterals;
+  private long[] baseRedLiterals;
+  private long[] adjDeltas;
+  private long fixedDelta;
+  private int zzBits90p;
+  private int zzBits100p;
+  private int brBits95p;
+  private int brBits100p;
+  private int bitsDeltaMax;
+  private int patchWidth;
+  private int patchGapWidth;
+  private int patchLength;
+  private long[] gapVsPatchList;
+  private long min;
+  private boolean isFixedDelta;
+
+  RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed) {
+    this.output = output;
+    this.signed = signed;
+    clear();
+  }
+
+  private void writeValues() throws IOException {
+    if (numLiterals != 0) {
+
+      if (encoding.equals(EncodingType.SHORT_REPEAT)) {
+        writeShortRepeatValues();
+      } else if (encoding.equals(EncodingType.DIRECT)) {
+        writeDirectValues();
+      } else if (encoding.equals(EncodingType.PATCHED_BASE)) {
+        writePatchedBaseValues();
+      } else {
+        writeDeltaValues();
+      }
+
+      // clear all the variables
+      clear();
+    }
+  }
+
+  private void writeDeltaValues() throws IOException {
+    int len = 0;
+    int fb = bitsDeltaMax;
+    int efb = 0;
+
+    if (isFixedDelta) {
+      // if fixed run length is greater than threshold then it will be fixed
+      // delta sequence with delta value 0 else fixed delta sequence with
+      // non-zero delta value
+      if (fixedRunLength > MIN_REPEAT) {
+        // ex. sequence: 2 2 2 2 2 2 2 2
+        len = fixedRunLength - 1;
+        fixedRunLength = 0;
+      } else {
+        // ex. sequence: 4 6 8 10 12 14 16
+        len = variableRunLength - 1;
+        variableRunLength = 0;
+      }
+    } else {
+      // fixed width 0 is used for long repeating values.
+      // sequences that require only 1 bit to encode will have an additional bit
+      if (fb == 1) {
+        fb = 2;
+      }
+      efb = SerializationUtils.encodeBitWidth(fb);
+      efb = efb << 1;
+      len = variableRunLength - 1;
+      variableRunLength = 0;
+    }
+
+    // extract the 9th bit of run length
+    int tailBits = (len & 0x100) >>> 8;
+
+    // create first byte of the header
+    int headerFirstByte = getOpcode() | efb | tailBits;
+
+    // second byte of the header stores the remaining 8 bits of runlength
+    int headerSecondByte = len & 0xff;
+
+    // write header
+    output.write(headerFirstByte);
+    output.write(headerSecondByte);
+
+    // store the first value from zigzag literal array
+    if (signed) {
+      SerializationUtils.writeVslong(output, literals[0]);
+    } else {
+      SerializationUtils.writeVulong(output, literals[0]);
+    }
+
+    if (isFixedDelta) {
+      // if delta is fixed then we don't need to store delta blob
+      SerializationUtils.writeVslong(output, fixedDelta);
+    } else {
+      // store the first value as delta value using zigzag encoding
+      SerializationUtils.writeVslong(output, adjDeltas[0]);
+      // adjacent delta values are bit packed
+      SerializationUtils.writeInts(adjDeltas, 1, adjDeltas.length - 1, fb,
+          output);
+    }
+  }
+
+  private void writePatchedBaseValues() throws IOException {
+
+    // write the number of fixed bits required in next 5 bits
+    int fb = brBits95p;
+    int efb = SerializationUtils.encodeBitWidth(fb) << 1;
+
+    // adjust variable run length, they are one off
+    variableRunLength -= 1;
+
+    // extract the 9th bit of run length
+    int tailBits = (variableRunLength & 0x100) >>> 8;
+
+    // create first byte of the header
+    int headerFirstByte = getOpcode() | efb | tailBits;
+
+    // second byte of the header stores the remaining 8 bits of runlength
+    int headerSecondByte = variableRunLength & 0xff;
+
+    // if the min value is negative toggle the sign
+    boolean isNegative = min < 0 ? true : false;
+    if (isNegative) {
+      min = -min;
+    }
+
+    // find the number of bytes required for base and shift it by 5 bits
+    // to accommodate patch width. The additional bit is used to store the sign
+    // of the base value.
+    int baseWidth = SerializationUtils.findClosestNumBits(min) + 1;
+    int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1;
+    int bb = (baseBytes - 1) << 5;
+
+    // if the base value is negative then set MSB to 1
+    if (isNegative) {
+      min |= (1L << ((baseBytes * 8) - 1));
+    }
+
+    // third byte contains 3 bits for number of bytes occupied by base
+    // and 5 bits for patchWidth
+    int headerThirdByte = bb | SerializationUtils.encodeBitWidth(patchWidth);
+
+    // fourth byte contains 3 bits for page gap width and 5 bits for
+    // patch length
+    int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength;
+
+    // write header
+    output.write(headerFirstByte);
+    output.write(headerSecondByte);
+    output.write(headerThirdByte);
+    output.write(headerFourthByte);
+
+    // write the base value using fixed bytes in big endian order
+    for(int i = baseBytes - 1; i >= 0; i--) {
+      byte b = (byte) ((min >>> (i * 8)) & 0xff);
+      output.write(b);
+    }
+
+    // base reduced literals are bit packed
+    int closestFixedBits = SerializationUtils.getClosestFixedBits(brBits95p);
+    SerializationUtils.writeInts(baseRedLiterals, 0, baseRedLiterals.length,
+        closestFixedBits, output);
+
+    // write patch list
+    closestFixedBits = SerializationUtils.getClosestFixedBits(patchGapWidth
+        + patchWidth);
+    SerializationUtils.writeInts(gapVsPatchList, 0, gapVsPatchList.length,
+        closestFixedBits, output);
+
+    // reset run length
+    variableRunLength = 0;
+  }
+
+  /**
+   * Store the opcode in 2 MSB bits
+   * @return opcode
+   */
+  private int getOpcode() {
+    return encoding.ordinal() << 6;
+  }
+
+  private void writeDirectValues() throws IOException {
+
+    // write the number of fixed bits required in next 5 bits
+    int efb = SerializationUtils.encodeBitWidth(zzBits100p) << 1;
+
+    // adjust variable run length
+    variableRunLength -= 1;
+
+    // extract the 9th bit of run length
+    int tailBits = (variableRunLength & 0x100) >>> 8;
+
+    // create first byte of the header
+    int headerFirstByte = getOpcode() | efb | tailBits;
+
+    // second byte of the header stores the remaining 8 bits of runlength
+    int headerSecondByte = variableRunLength & 0xff;
+
+    // write header
+    output.write(headerFirstByte);
+    output.write(headerSecondByte);
+
+    // bit packing the zigzag encoded literals
+    SerializationUtils.writeInts(zigzagLiterals, 0, zigzagLiterals.length,
+        zzBits100p, output);
+
+    // reset run length
+    variableRunLength = 0;
+  }
+
+  private void writeShortRepeatValues() throws IOException {
+    // get the value that is repeating, compute the bits and bytes required
+    long repeatVal = 0;
+    if (signed) {
+      repeatVal = SerializationUtils.zigzagEncode(literals[0]);
+    } else {
+      repeatVal = literals[0];
+    }
+
+    int numBitsRepeatVal = SerializationUtils.findClosestNumBits(repeatVal);
+    int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3
+        : (numBitsRepeatVal >>> 3) + 1;
+
+    // write encoding type in top 2 bits
+    int header = getOpcode();
+
+    // write the number of bytes required for the value
+    header |= ((numBytesRepeatVal - 1) << 3);
+
+    // write the run length
+    fixedRunLength -= MIN_REPEAT;
+    header |= fixedRunLength;
+
+    // write the header
+    output.write(header);
+
+    // write the repeating value in big endian byte order
+    for(int i = numBytesRepeatVal - 1; i >= 0; i--) {
+      int b = (int) ((repeatVal >>> (i * 8)) & 0xff);
+      output.write(b);
+    }
+
+    fixedRunLength = 0;
+  }
+
+  private void determineEncoding() {
+    // used for direct encoding
+    zigzagLiterals = new long[numLiterals];
+
+    // used for patched base encoding
+    baseRedLiterals = new long[numLiterals];
+
+    // used for delta encoding
+    adjDeltas = new long[numLiterals - 1];
+
+    int idx = 0;
+
+    // for identifying monotonic sequences
+    boolean isIncreasing = false;
+    int increasingCount = 1;
+    boolean isDecreasing = false;
+    int decreasingCount = 1;
+
+    // for identifying type of delta encoding
+    min = literals[0];
+    long max = literals[0];
+    isFixedDelta = true;
+    long currDelta = 0;
+
+    min = literals[0];
+    long deltaMax = 0;
+
+    // populate all variables to identify the encoding type
+    if (numLiterals >= 1) {
+      currDelta = literals[1] - literals[0];
+      for(int i = 0; i < numLiterals; i++) {
+        if (i > 0 && literals[i] >= max) {
+          max = literals[i];
+          increasingCount++;
+        }
+
+        if (i > 0 && literals[i] <= min) {
+          min = literals[i];
+          decreasingCount++;
+        }
+
+        // if delta doesn't changes then mark it as fixed delta
+        if (i > 0 && isFixedDelta) {
+          if (literals[i] - literals[i - 1] != currDelta) {
+            isFixedDelta = false;
+          }
+
+          fixedDelta = currDelta;
+        }
+
+        // populate zigzag encoded literals
+        long zzEncVal = 0;
+        if (signed) {
+          zzEncVal = SerializationUtils.zigzagEncode(literals[i]);
+        } else {
+          zzEncVal = literals[i];
+        }
+        zigzagLiterals[idx] = zzEncVal;
+        idx++;
+
+        // max delta value is required for computing the fixed bits
+        // required for delta blob in delta encoding
+        if (i > 0) {
+          if (i == 1) {
+            // first value preserve the sign
+            adjDeltas[i - 1] = literals[i] - literals[i - 1];
+          } else {
+            adjDeltas[i - 1] = Math.abs(literals[i] - literals[i - 1]);
+            if (adjDeltas[i - 1] > deltaMax) {
+              deltaMax = adjDeltas[i - 1];
+            }
+          }
+        }
+      }
+
+      // stores the number of bits required for packing delta blob in
+      // delta encoding
+      bitsDeltaMax = SerializationUtils.findClosestNumBits(deltaMax);
+
+      // if decreasing count equals total number of literals then the
+      // sequence is monotonically decreasing
+      if (increasingCount == 1 && decreasingCount == numLiterals) {
+        isDecreasing = true;
+      }
+
+      // if increasing count equals total number of literals then the
+      // sequence is monotonically increasing
+      if (decreasingCount == 1 && increasingCount == numLiterals) {
+        isIncreasing = true;
+      }
+    }
+
+    // if the sequence is both increasing and decreasing then it is not
+    // monotonic
+    if (isDecreasing && isIncreasing) {
+      isDecreasing = false;
+      isIncreasing = false;
+    }
+
+    // fixed delta condition
+    if (isIncreasing == false && isDecreasing == false && isFixedDelta == true) {
+      encoding = EncodingType.DELTA;
+      return;
+    }
+
+    // monotonic condition
+    if (isIncreasing || isDecreasing) {
+      encoding = EncodingType.DELTA;
+      return;
+    }
+
+    // percentile values are computed for the zigzag encoded values. if the
+    // number of bit requirement between 90th and 100th percentile varies
+    // beyond a threshold then we need to patch the values. if the variation
+    // is not significant then we can use direct or delta encoding
+
+    double p = 0.9;
+    zzBits90p = SerializationUtils.percentileBits(zigzagLiterals, p);
+
+    p = 1.0;
+    zzBits100p = SerializationUtils.percentileBits(zigzagLiterals, p);
+
+    int diffBitsLH = zzBits100p - zzBits90p;
+
+    // if the difference between 90th percentile and 100th percentile fixed
+    // bits is > 1 then we need patch the values
+    if (isIncreasing == false && isDecreasing == false && diffBitsLH > 1
+        && isFixedDelta == false) {
+      // patching is done only on base reduced values.
+      // remove base from literals
+      for(int i = 0; i < zigzagLiterals.length; i++) {
+        baseRedLiterals[i] = literals[i] - min;
+      }
+
+      // 95th percentile width is used to determine max allowed value
+      // after which patching will be done
+      p = 0.95;
+      brBits95p = SerializationUtils.percentileBits(baseRedLiterals, p);
+
+      // 100th percentile is used to compute the max patch width
+      p = 1.0;
+      brBits100p = SerializationUtils.percentileBits(baseRedLiterals, p);
+
+      // after base reducing the values, if the difference in bits between
+      // 95th percentile and 100th percentile value is zero then there
+      // is no point in patching the values, in which case we will
+      // fallback to DIRECT encoding.
+      // The decision to use patched base was based on zigzag values, but the
+      // actual patching is done on base reduced literals.
+      if ((brBits100p - brBits95p) != 0) {
+        encoding = EncodingType.PATCHED_BASE;
+        preparePatchedBlob();
+        return;
+      } else {
+        encoding = EncodingType.DIRECT;
+        return;
+      }
+    }
+
+    // if difference in bits between 95th percentile and 100th percentile is
+    // 0, then patch length will become 0. Hence we will fallback to direct
+    if (isIncreasing == false && isDecreasing == false && diffBitsLH <= 1
+        && isFixedDelta == false) {
+      encoding = EncodingType.DIRECT;
+      return;
+    }
+
+    // this should not happen
+    if (encoding == null) {
+      throw new RuntimeException("Integer encoding cannot be determined.");
+    }
+  }
+
+  private void preparePatchedBlob() {
+    // mask will be max value beyond which patch will be generated
+    int mask = (1 << brBits95p) - 1;
+
+    // since we are considering only 95 percentile, the size of gap and
+    // patch array can contain only be 5% values
+    patchLength = (int) Math.ceil((baseRedLiterals.length * 0.05));
+    int[] gapList = new int[patchLength];
+    long[] patchList = new long[patchLength];
+
+    // #bit for patch
+    patchWidth = brBits100p - brBits95p;
+    patchWidth = SerializationUtils.getClosestFixedBits(patchWidth);
+
+    int gapIdx = 0;
+    int patchIdx = 0;
+    int prev = 0;
+    int gap = 0;
+    int maxGap = 0;
+
+    for(int i = 0; i < baseRedLiterals.length; i++) {
+      // if value is above mask then create the patch and record the gap
+      if (baseRedLiterals[i] > mask) {
+        gap = i - prev;
+        if (gap > maxGap) {
+          maxGap = gap;
+        }
+
+        // gaps are relative, so store the previous patched value index
+        prev = i;
+        gapList[gapIdx++] = gap;
+
+        // extract the most significant bits that are over mask bits
+        long patch = baseRedLiterals[i] >>> brBits95p;
+        patchList[patchIdx++] = patch;
+
+        // strip off the MSB to enable safe bit packing
+        baseRedLiterals[i] &= mask;
+      }
+    }
+
+    // adjust the patch length to number of entries in gap list
+    patchLength = gapIdx;
+
+    // if the element to be patched is the first and only element then
+    // max gap will be 0, but to store the gap as 0 we need atleast 1 bit
+    if (maxGap == 0 && patchLength != 0) {
+      patchGapWidth = 1;
+    } else {
+      patchGapWidth = SerializationUtils.findClosestNumBits(maxGap);
+    }
+
+    // special case: if the patch gap width is greater than 256, then
+    // we need 9 bits to encode the gap width. But we only have 3 bits in
+    // header to record the gap width. To deal with this case, we will save
+    // two entries in patch list in the following way
+    // 256 gap width => 0 for patch value
+    // actual gap - 256 => actual patch value
+    // We will do the same for gap width = 511. If the element to be patched is
+    // the last element in the scope then gap width will be 511. In this case we
+    // will have 3 entries in the patch list in the following way
+    // 255 gap width => 0 for patch value
+    // 255 gap width => 0 for patch value
+    // 1 gap width => actual patch value
+    if (patchGapWidth > 8) {
+      patchGapWidth = 8;
+      // for gap = 511, we need two additional entries in patch list
+      if (maxGap == 511) {
+        patchLength += 2;
+      } else {
+        patchLength += 1;
+      }
+    }
+
+    // create gap vs patch list
+    gapIdx = 0;
+    patchIdx = 0;
+    gapVsPatchList = new long[patchLength];
+    for(int i = 0; i < patchLength; i++) {
+      long g = gapList[gapIdx++];
+      long p = patchList[patchIdx++];
+      while (g > 255) {
+        gapVsPatchList[i++] = (255 << patchWidth) | 0;
+        g -= 255;
+      }
+
+      // store patch value in LSBs and gap in MSBs
+      gapVsPatchList[i] = (g << patchWidth) | p;
+    }
+  }
+
+  /**
+   * clears all the variables
+   */
+  private void clear() {
+    numLiterals = 0;
+    encoding = null;
+    prevDelta = 0;
+    zigzagLiterals = null;
+    baseRedLiterals = null;
+    adjDeltas = null;
+    fixedDelta = 0;
+    zzBits90p = 0;
+    zzBits100p = 0;
+    brBits95p = 0;
+    brBits100p = 0;
+    bitsDeltaMax = 0;
+    patchGapWidth = 0;
+    patchLength = 0;
+    patchWidth = 0;
+    gapVsPatchList = null;
+    min = 0;
+    isFixedDelta = false;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (numLiterals != 0) {
+      if (variableRunLength != 0) {
+        determineEncoding();
+        writeValues();
+      } else if (fixedRunLength != 0) {
+        if (fixedRunLength < MIN_REPEAT) {
+          variableRunLength = fixedRunLength;
+          fixedRunLength = 0;
+          determineEncoding();
+          writeValues();
+        } else if (fixedRunLength >= MIN_REPEAT
+            && fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) {
+          encoding = EncodingType.SHORT_REPEAT;
+          writeValues();
+        } else {
+          encoding = EncodingType.DELTA;
+          isFixedDelta = true;
+          writeValues();
+        }
+      }
+    }
+    output.flush();
+  }
+
+  @Override
+  public void write(long val) throws IOException {
+    if (numLiterals == 0) {
+      initializeLiterals(val);
+    } else {
+      if (numLiterals == 1) {
+        prevDelta = val - literals[0];
+        literals[numLiterals++] = val;
+        // if both values are same count as fixed run else variable run
+        if (val == literals[0]) {
+          fixedRunLength = 2;
+          variableRunLength = 0;
+        } else {
+          fixedRunLength = 0;
+          variableRunLength = 2;
+        }
+      } else {
+        long currentDelta = val - literals[numLiterals - 1];
+        if (prevDelta == 0 && currentDelta == 0) {
+          // fixed delta run
+
+          literals[numLiterals++] = val;
+
+          // if variable run is non-zero then we are seeing repeating
+          // values at the end of variable run in which case keep
+          // updating variable and fixed runs
+          if (variableRunLength > 0) {
+            fixedRunLength = 2;
+          }
+          fixedRunLength += 1;
+
+          // if fixed run met the minimum condition and if variable
+          // run is non-zero then flush the variable run and shift the
+          // tail fixed runs to start of the buffer
+          if (fixedRunLength >= MIN_REPEAT && variableRunLength > 0) {
+            numLiterals -= MIN_REPEAT;
+            variableRunLength -= MIN_REPEAT - 1;
+            // copy the tail fixed runs
+            long[] tailVals = new long[MIN_REPEAT];
+            System.arraycopy(literals, numLiterals, tailVals, 0, MIN_REPEAT);
+
+            // determine variable encoding and flush values
+            determineEncoding();
+            writeValues();
+
+            // shift tail fixed runs to beginning of the buffer
+            for(long l : tailVals) {
+              literals[numLiterals++] = l;
+            }
+          }
+
+          // if fixed runs reached max repeat length then write values
+          if (fixedRunLength == MAX_SCOPE) {
+            determineEncoding();
+            writeValues();
+          }
+        } else {
+          // variable delta run
+
+          // if fixed run length is non-zero and if it satisfies the
+          // short repeat conditions then write the values as short repeats
+          // else use delta encoding
+          if (fixedRunLength >= MIN_REPEAT) {
+            if (fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) {
+              encoding = EncodingType.SHORT_REPEAT;
+              writeValues();
+            } else {
+              encoding = EncodingType.DELTA;
+              isFixedDelta = true;
+              writeValues();
+            }
+          }
+
+          // if fixed run length is <MIN_REPEAT and current value is
+          // different from previous then treat it as variable run
+          if (fixedRunLength > 0 && fixedRunLength < MIN_REPEAT) {
+            if (val != literals[numLiterals - 1]) {
+              variableRunLength = fixedRunLength;
+              fixedRunLength = 0;
+            }
+          }
+
+          // after writing values re-initialize the variables
+          if (numLiterals == 0) {
+            initializeLiterals(val);
+          } else {
+            // keep updating variable run lengths
+            prevDelta = val - literals[numLiterals - 1];
+            literals[numLiterals++] = val;
+            variableRunLength += 1;
+
+            // if variable run length reach the max scope, write it
+            if (variableRunLength == MAX_SCOPE) {
+              determineEncoding();
+              writeValues();
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private void initializeLiterals(long val) {
+    literals[numLiterals++] = val;
+    fixedRunLength = 1;
+    variableRunLength = 1;
+  }
+
+  @Override
+  public void getPosition(PositionRecorder recorder) throws IOException {
+    output.getPosition(recorder);
+    recorder.addPosition(numLiterals);
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java?rev=1513155&r1=1513154&r2=1513155&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java Mon Aug 12 15:03:30 2013
@@ -185,4 +185,279 @@ final class SerializationUtils {
     result = result.shiftRight(1);
     return result;
   }
+
+  enum FixedBitSizes {
+    ONE, TWO, THREE, FOUR, FIVE, SIX, SEVEN, EIGHT, NINE, TEN, ELEVEN, TWELVE,
+    THIRTEEN, FOURTEEN, FIFTEEN, SIXTEEN, SEVENTEEN, EIGHTEEN, NINETEEN,
+    TWENTY, TWENTYONE, TWENTYTWO, TWENTYTHREE, TWENTYFOUR, TWENTYSIX,
+    TWENTYEIGHT, THIRTY, THIRTYTWO, FORTY, FORTYEIGHT, FIFTYSIX, SIXTYFOUR;
+  }
+
+  /**
+   * Count the number of bits required to encode the given value
+   * @param value
+   * @return bits required to store value
+   */
+  static int findClosestNumBits(long value) {
+    int count = 0;
+    while (value > 0) {
+      count++;
+      value = value >>> 1;
+    }
+    return getClosestFixedBits(count);
+  }
+
+  /**
+   * zigzag encode the given value
+   * @param val
+   * @return zigzag encoded value
+   */
+  static long zigzagEncode(long val) {
+    return (val << 1) ^ (val >> 63);
+  }
+
+  /**
+   * zigzag decode the given value
+   * @param val
+   * @return zizag decoded value
+   */
+  static long zigzagDecode(long val) {
+    return (val >>> 1) ^ -(val & 1);
+  }
+
+  /**
+   * Compute the bits required to represent pth percentile value
+   * @param data - array
+   * @param p - percentile value (>=0.0 to <=1.0)
+   * @return pth percentile bits
+   */
+  static int percentileBits(long[] data, double p) {
+    if ((p > 1.0) || (p <= 0.0)) {
+      return -1;
+    }
+
+    // histogram that store the encoded bit requirement for each values.
+    // maximum number of bits that can encoded is 32 (refer FixedBitSizes)
+    int[] hist = new int[32];
+
+    // compute the histogram
+    for(long l : data) {
+      int idx = encodeBitWidth(findClosestNumBits(l));
+      hist[idx] += 1;
+    }
+
+    int len = data.length;
+    int perLen = (int) (len * (1.0 - p));
+
+    // return the bits required by pth percentile length
+    for(int i = hist.length - 1; i >= 0; i--) {
+      perLen -= hist[i];
+      if (perLen < 0) {
+        return decodeBitWidth(i);
+      }
+    }
+
+    return 0;
+  }
+
+  /**
+   * Read n bytes in big endian order and convert to long
+   * @param b - byte array
+   * @return long value
+   */
+  static long bytesToLongBE(InStream input, int n) throws IOException {
+    long out = 0;
+    long val = 0;
+    while (n > 0) {
+      n--;
+      // store it in a long and then shift else integer overflow will occur
+      val = input.read();
+      out |= (val << (n * 8));
+    }
+    return out;
+  }
+
+  /**
+   * Calculate the number of bytes required
+   * @param n - number of values
+   * @param numBits - bit width
+   * @return number of bytes required
+   */
+  static int getTotalBytesRequired(int n, int numBits) {
+    return (n * numBits + 7) / 8;
+  }
+
+  /**
+   * For a given fixed bit this function will return the closest available fixed
+   * bit
+   * @param n
+   * @return closest valid fixed bit
+   */
+  static int getClosestFixedBits(int n) {
+    if (n == 0) {
+      return 1;
+    }
+
+    if (n >= 1 && n <= 24) {
+      return n;
+    } else if (n > 24 && n <= 26) {
+      return 26;
+    } else if (n > 26 && n <= 28) {
+      return 28;
+    } else if (n > 28 && n <= 30) {
+      return 30;
+    } else if (n > 30 && n <= 32) {
+      return 32;
+    } else if (n > 32 && n <= 40) {
+      return 40;
+    } else if (n > 40 && n <= 48) {
+      return 48;
+    } else if (n > 48 && n <= 56) {
+      return 56;
+    } else {
+      return 64;
+    }
+  }
+
+  /**
+   * Finds the closest available fixed bit width match and returns its encoded
+   * value (ordinal)
+   * @param n - fixed bit width to encode
+   * @return encoded fixed bit width
+   */
+  static int encodeBitWidth(int n) {
+    n = getClosestFixedBits(n);
+
+    if (n >= 1 && n <= 24) {
+      return n - 1;
+    } else if (n > 24 && n <= 26) {
+      return FixedBitSizes.TWENTYSIX.ordinal();
+    } else if (n > 26 && n <= 28) {
+      return FixedBitSizes.TWENTYEIGHT.ordinal();
+    } else if (n > 28 && n <= 30) {
+      return FixedBitSizes.THIRTY.ordinal();
+    } else if (n > 30 && n <= 32) {
+      return FixedBitSizes.THIRTYTWO.ordinal();
+    } else if (n > 32 && n <= 40) {
+      return FixedBitSizes.FORTY.ordinal();
+    } else if (n > 40 && n <= 48) {
+      return FixedBitSizes.FORTYEIGHT.ordinal();
+    } else if (n > 48 && n <= 56) {
+      return FixedBitSizes.FIFTYSIX.ordinal();
+    } else {
+      return FixedBitSizes.SIXTYFOUR.ordinal();
+    }
+  }
+
+  /**
+   * Decodes the ordinal fixed bit value to actual fixed bit width value
+   * @param n - encoded fixed bit width
+   * @return decoded fixed bit width
+   */
+  static int decodeBitWidth(int n) {
+    if (n >= FixedBitSizes.ONE.ordinal()
+        && n <= FixedBitSizes.TWENTYFOUR.ordinal()) {
+      return n + 1;
+    } else if (n == FixedBitSizes.TWENTYSIX.ordinal()) {
+      return 26;
+    } else if (n == FixedBitSizes.TWENTYEIGHT.ordinal()) {
+      return 28;
+    } else if (n == FixedBitSizes.THIRTY.ordinal()) {
+      return 30;
+    } else if (n == FixedBitSizes.THIRTYTWO.ordinal()) {
+      return 32;
+    } else if (n == FixedBitSizes.FORTY.ordinal()) {
+      return 40;
+    } else if (n == FixedBitSizes.FORTYEIGHT.ordinal()) {
+      return 48;
+    } else if (n == FixedBitSizes.FIFTYSIX.ordinal()) {
+      return 56;
+    } else {
+      return 64;
+    }
+  }
+
+  /**
+   * Bitpack and write the input values to underlying output stream
+   * @param input - values to write
+   * @param offset - offset
+   * @param len - length
+   * @param bitSize - bit width
+   * @param output - output stream
+   * @throws IOException
+   */
+  static void writeInts(long[] input, int offset, int len, int bitSize,
+                        OutputStream output) throws IOException {
+    if (input == null || input.length < 1 || offset < 0 || len < 1
+        || bitSize < 1) {
+      return;
+    }
+
+    int bitsLeft = 8;
+    byte current = 0;
+    for(int i = offset; i < (offset + len); i++) {
+      long value = input[i];
+      int bitsToWrite = bitSize;
+      while (bitsToWrite > bitsLeft) {
+        // add the bits to the bottom of the current word
+        current |= value >>> (bitsToWrite - bitsLeft);
+        // subtract out the bits we just added
+        bitsToWrite -= bitsLeft;
+        // zero out the bits above bitsToWrite
+        value &= (1L << bitsToWrite) - 1;
+        output.write(current);
+        current = 0;
+        bitsLeft = 8;
+      }
+      bitsLeft -= bitsToWrite;
+      current |= value << bitsLeft;
+      if (bitsLeft == 0) {
+        output.write(current);
+        current = 0;
+        bitsLeft = 8;
+      }
+    }
+
+    // flush
+    if (bitsLeft != 8) {
+      output.write(current);
+      current = 0;
+      bitsLeft = 8;
+    }
+  }
+
+  /**
+   * Read bitpacked integers from input stream
+   * @param buffer - input buffer
+   * @param offset - offset
+   * @param len - length
+   * @param bitSize - bit width
+   * @param input - input stream
+   * @throws IOException
+   */
+  static void readInts(long[] buffer, int offset, int len, int bitSize,
+                       InStream input) throws IOException {
+    int bitsLeft = 0;
+    int current = 0;
+
+    for(int i = offset; i < (offset + len); i++) {
+      long result = 0;
+      int bitsLeftToRead = bitSize;
+      while (bitsLeftToRead > bitsLeft) {
+        result <<= bitsLeft;
+        result |= current & ((1 << bitsLeft) - 1);
+        bitsLeftToRead -= bitsLeft;
+        current = input.read();
+        bitsLeft = 8;
+      }
+
+      // handle the left over bits
+      if (bitsLeftToRead > 0) {
+        result <<= bitsLeftToRead;
+        bitsLeft -= bitsLeftToRead;
+        result |= (current >> bitsLeft) & ((1 << bitsLeftToRead) - 1);
+      }
+      buffer[i] = result;
+    }
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1513155&r1=1513154&r2=1513155&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Mon Aug 12 15:03:30 2013
@@ -381,6 +381,7 @@ class WriterImpl implements Writer, Memo
     private final PositionedOutputStream rowIndexStream;
     private boolean foundNulls;
     private OutStream isPresentOutStream;
+    protected final boolean useDirectV2Encoding;
 
     /**
      * Create a tree writer.
@@ -396,6 +397,7 @@ class WriterImpl implements Writer, Memo
       this.isCompressed = streamFactory.isCompressed();
       this.id = columnId;
       this.inspector = inspector;
+      this.useDirectV2Encoding = true;
       if (nullable) {
         isPresentOutStream = streamFactory.createStream(id,
             OrcProto.Stream.Kind.PRESENT);
@@ -430,6 +432,32 @@ class WriterImpl implements Writer, Memo
       return rowIndexEntry;
     }
 
+    IntegerWriter createIntegerWriter(PositionedOutputStream output,
+                                      boolean signed, boolean isDirectV2) {
+      if (isDirectV2) {
+        return new RunLengthIntegerWriterV2(output, signed);
+      } else {
+        return new RunLengthIntegerWriter(output, signed);
+      }
+    }
+
+    boolean isNewWriteFormat(StreamFactory writer) {
+      String writeFormat = writer.getConfiguration().get(
+          HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.varname);
+      if (writeFormat == null) {
+        LOG.warn("ORC write format not defined. Using 0.12 ORC write format.");
+        return true;
+      }
+      if (writeFormat
+          .equals(HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.defaultVal)) {
+        LOG.info("Using 0.11 ORC write format.");
+        return false;
+      }
+
+      LOG.info("Using 0.12 ORC write format.");
+      return true;
+    }
+
     /**
      * Add a new value to the column.
      * @param obj
@@ -635,10 +663,11 @@ class WriterImpl implements Writer, Memo
   }
 
   private static class IntegerTreeWriter extends TreeWriter {
-    private final RunLengthIntegerWriter writer;
+    private final IntegerWriter writer;
     private final ShortObjectInspector shortInspector;
     private final IntObjectInspector intInspector;
     private final LongObjectInspector longInspector;
+    private boolean isDirectV2 = true;
 
     IntegerTreeWriter(int columnId,
                       ObjectInspector inspector,
@@ -647,7 +676,8 @@ class WriterImpl implements Writer, Memo
       super(columnId, inspector, writer, nullable);
       PositionedOutputStream out = writer.createStream(id,
           OrcProto.Stream.Kind.DATA);
-      this.writer = new RunLengthIntegerWriter(out, true);
+      this.isDirectV2 = isNewWriteFormat(writer);
+      this.writer = createIntegerWriter(out, true, isDirectV2);
       if (inspector instanceof IntObjectInspector) {
         intInspector = (IntObjectInspector) inspector;
         shortInspector = null;
@@ -666,6 +696,16 @@ class WriterImpl implements Writer, Memo
     }
 
     @Override
+    OrcProto.ColumnEncoding getEncoding() {
+      if (isDirectV2) {
+        return OrcProto.ColumnEncoding.newBuilder()
+            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+      }
+      return OrcProto.ColumnEncoding.newBuilder()
+          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+    }
+
+    @Override
     void write(Object obj) throws IOException {
       super.write(obj);
       if (obj != null) {
@@ -776,13 +816,13 @@ class WriterImpl implements Writer, Memo
   private static class StringTreeWriter extends TreeWriter {
     private static final int INITIAL_DICTIONARY_SIZE = 4096;
     private final OutStream stringOutput;
-    private final RunLengthIntegerWriter lengthOutput;
-    private final RunLengthIntegerWriter rowOutput;
+    private final IntegerWriter lengthOutput;
+    private final IntegerWriter rowOutput;
     private final StringRedBlackTree dictionary =
         new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
     private final DynamicIntArray rows = new DynamicIntArray();
     private final PositionedOutputStream directStreamOutput;
-    private final RunLengthIntegerWriter directLengthOutput;
+    private final IntegerWriter directLengthOutput;
     private final List<OrcProto.RowIndexEntry> savedRowIndex =
         new ArrayList<OrcProto.RowIndexEntry>();
     private final boolean buildIndex;
@@ -791,25 +831,26 @@ class WriterImpl implements Writer, Memo
     //the total number of non-null rows, turn off dictionary encoding
     private final float dictionaryKeySizeThreshold;
     private boolean useDictionaryEncoding = true;
+    private boolean isDirectV2 = true;
 
     StringTreeWriter(int columnId,
                      ObjectInspector inspector,
                      StreamFactory writer,
                      boolean nullable) throws IOException {
       super(columnId, inspector, writer, nullable);
+      this.isDirectV2 = isNewWriteFormat(writer);
       stringOutput = writer.createStream(id,
           OrcProto.Stream.Kind.DICTIONARY_DATA);
-      lengthOutput = new RunLengthIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.LENGTH), false);
-      rowOutput = new RunLengthIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.DATA), false);
+      lengthOutput = createIntegerWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
+      rowOutput = createIntegerWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.DATA), false, isDirectV2);
       recordPosition(rowIndexPosition);
       rowIndexValueCount.add(0L);
       buildIndex = writer.buildIndex();
       directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA);
-      directLengthOutput =
-        new RunLengthIntegerWriter(writer.createStream
-                                    (id, OrcProto.Stream.Kind.LENGTH), false);
+      directLengthOutput = createIntegerWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
       dictionaryKeySizeThreshold = writer.getConfiguration().getFloat(
         HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.varname,
         HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.
@@ -906,24 +947,23 @@ class WriterImpl implements Writer, Memo
       rowIndexValueCount.add(0L);
     }
 
-    // Calls getPosition on the row output stream if dictionary encoding is used, and the direct
-    // output stream if direct encoding is used
-    private void recordOutputPosition(OrcProto.RowIndexEntry.Builder base) throws IOException {
-      if (useDictionaryEncoding) {
-        rowOutput.getPosition(new RowIndexPositionRecorder(base));
-      } else {
-        directStreamOutput.getPosition(new RowIndexPositionRecorder(base));
-      }
-    }
-
     @Override
     OrcProto.ColumnEncoding getEncoding() {
       // Returns the encoding used for the last call to writeStripe
       if (useDictionaryEncoding) {
+        if(isDirectV2) {
+          return OrcProto.ColumnEncoding.newBuilder().setKind(
+              OrcProto.ColumnEncoding.Kind.DICTIONARY_V2).
+              setDictionarySize(dictionary.size()).build();
+        }
         return OrcProto.ColumnEncoding.newBuilder().setKind(
             OrcProto.ColumnEncoding.Kind.DICTIONARY).
             setDictionarySize(dictionary.size()).build();
       } else {
+        if(isDirectV2) {
+          return OrcProto.ColumnEncoding.newBuilder().setKind(
+              OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+        }
         return OrcProto.ColumnEncoding.newBuilder().setKind(
             OrcProto.ColumnEncoding.Kind.DIRECT).build();
       }
@@ -956,7 +996,8 @@ class WriterImpl implements Writer, Memo
 
   private static class BinaryTreeWriter extends TreeWriter {
     private final PositionedOutputStream stream;
-    private final RunLengthIntegerWriter length;
+    private final IntegerWriter length;
+    private boolean isDirectV2 = true;
 
     BinaryTreeWriter(int columnId,
                      ObjectInspector inspector,
@@ -965,12 +1006,23 @@ class WriterImpl implements Writer, Memo
       super(columnId, inspector, writer, nullable);
       this.stream = writer.createStream(id,
           OrcProto.Stream.Kind.DATA);
-      this.length = new RunLengthIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.LENGTH), false);
+      this.isDirectV2 = isNewWriteFormat(writer);
+      this.length = createIntegerWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
       recordPosition(rowIndexPosition);
     }
 
     @Override
+    OrcProto.ColumnEncoding getEncoding() {
+      if (isDirectV2) {
+        return OrcProto.ColumnEncoding.newBuilder()
+            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+      }
+      return OrcProto.ColumnEncoding.newBuilder()
+          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+    }
+
+    @Override
     void write(Object obj) throws IOException {
       super.write(obj);
       if (obj != null) {
@@ -1003,22 +1055,34 @@ class WriterImpl implements Writer, Memo
       Timestamp.valueOf("2015-01-01 00:00:00").getTime() / MILLIS_PER_SECOND;
 
   private static class TimestampTreeWriter extends TreeWriter {
-    private final RunLengthIntegerWriter seconds;
-    private final RunLengthIntegerWriter nanos;
+    private final IntegerWriter seconds;
+    private final IntegerWriter nanos;
+    private final boolean isDirectV2;
 
     TimestampTreeWriter(int columnId,
                      ObjectInspector inspector,
                      StreamFactory writer,
                      boolean nullable) throws IOException {
       super(columnId, inspector, writer, nullable);
-      this.seconds = new RunLengthIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.DATA), true);
-      this.nanos = new RunLengthIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.SECONDARY), false);
+      this.isDirectV2 = isNewWriteFormat(writer);
+      this.seconds = createIntegerWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.DATA), true, isDirectV2);
+      this.nanos = createIntegerWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.SECONDARY), false, isDirectV2);
       recordPosition(rowIndexPosition);
     }
 
     @Override
+    OrcProto.ColumnEncoding getEncoding() {
+      if (isDirectV2) {
+        return OrcProto.ColumnEncoding.newBuilder()
+            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+      }
+      return OrcProto.ColumnEncoding.newBuilder()
+          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+    }
+
+    @Override
     void write(Object obj) throws IOException {
       super.write(obj);
       if (obj != null) {
@@ -1064,7 +1128,8 @@ class WriterImpl implements Writer, Memo
   }
 
   private static class DateTreeWriter extends TreeWriter {
-    private final RunLengthIntegerWriter writer;
+    private final IntegerWriter writer;
+    private final boolean isDirectV2;
 
     DateTreeWriter(int columnId,
                    ObjectInspector inspector,
@@ -1073,7 +1138,8 @@ class WriterImpl implements Writer, Memo
       super(columnId, inspector, writer, nullable);
       PositionedOutputStream out = writer.createStream(id,
           OrcProto.Stream.Kind.DATA);
-      this.writer = new RunLengthIntegerWriter(out, true);
+      this.isDirectV2 = isNewWriteFormat(writer);
+      this.writer = createIntegerWriter(out, true, isDirectV2);
       recordPosition(rowIndexPosition);
     }
 
@@ -1101,24 +1167,46 @@ class WriterImpl implements Writer, Memo
       super.recordPosition(recorder);
       writer.getPosition(recorder);
     }
+
+    @Override
+    OrcProto.ColumnEncoding getEncoding() {
+      if (isDirectV2) {
+        return OrcProto.ColumnEncoding.newBuilder()
+            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+      }
+      return OrcProto.ColumnEncoding.newBuilder()
+          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+    }
   }
 
   private static class DecimalTreeWriter extends TreeWriter {
     private final PositionedOutputStream valueStream;
-    private final RunLengthIntegerWriter scaleStream;
+    private final IntegerWriter scaleStream;
+    private final boolean isDirectV2;
 
     DecimalTreeWriter(int columnId,
                         ObjectInspector inspector,
                         StreamFactory writer,
                         boolean nullable) throws IOException {
       super(columnId, inspector, writer, nullable);
+      this.isDirectV2 = isNewWriteFormat(writer);
       valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
-      scaleStream = new RunLengthIntegerWriter(writer.createStream(id,
-          OrcProto.Stream.Kind.SECONDARY), true);
+      this.scaleStream = createIntegerWriter(writer.createStream(id,
+          OrcProto.Stream.Kind.SECONDARY), true, isDirectV2);
       recordPosition(rowIndexPosition);
     }
 
     @Override
+    OrcProto.ColumnEncoding getEncoding() {
+      if (isDirectV2) {
+        return OrcProto.ColumnEncoding.newBuilder()
+            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+      }
+      return OrcProto.ColumnEncoding.newBuilder()
+          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+    }
+
+    @Override
     void write(Object obj) throws IOException {
       super.write(obj);
       if (obj != null) {
@@ -1191,25 +1279,36 @@ class WriterImpl implements Writer, Memo
   }
 
   private static class ListTreeWriter extends TreeWriter {
-    private final RunLengthIntegerWriter lengths;
+    private final IntegerWriter lengths;
+    private final boolean isDirectV2;
 
     ListTreeWriter(int columnId,
                    ObjectInspector inspector,
                    StreamFactory writer,
                    boolean nullable) throws IOException {
       super(columnId, inspector, writer, nullable);
+      this.isDirectV2 = isNewWriteFormat(writer);
       ListObjectInspector listObjectInspector = (ListObjectInspector) inspector;
       childrenWriters = new TreeWriter[1];
       childrenWriters[0] =
         createTreeWriter(listObjectInspector.getListElementObjectInspector(),
           writer, true);
-      lengths =
-        new RunLengthIntegerWriter(writer.createStream(columnId,
-            OrcProto.Stream.Kind.LENGTH), false);
+      lengths = createIntegerWriter(writer.createStream(columnId,
+          OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
       recordPosition(rowIndexPosition);
     }
 
     @Override
+    OrcProto.ColumnEncoding getEncoding() {
+      if (isDirectV2) {
+        return OrcProto.ColumnEncoding.newBuilder()
+            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+      }
+      return OrcProto.ColumnEncoding.newBuilder()
+          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+    }
+
+    @Override
     void write(Object obj) throws IOException {
       super.write(obj);
       if (obj != null) {
@@ -1241,26 +1340,37 @@ class WriterImpl implements Writer, Memo
   }
 
   private static class MapTreeWriter extends TreeWriter {
-    private final RunLengthIntegerWriter lengths;
+    private final IntegerWriter lengths;
+    private final boolean isDirectV2;
 
     MapTreeWriter(int columnId,
                   ObjectInspector inspector,
                   StreamFactory writer,
                   boolean nullable) throws IOException {
       super(columnId, inspector, writer, nullable);
+      this.isDirectV2 = isNewWriteFormat(writer);
       MapObjectInspector insp = (MapObjectInspector) inspector;
       childrenWriters = new TreeWriter[2];
       childrenWriters[0] =
         createTreeWriter(insp.getMapKeyObjectInspector(), writer, true);
       childrenWriters[1] =
         createTreeWriter(insp.getMapValueObjectInspector(), writer, true);
-      lengths =
-        new RunLengthIntegerWriter(writer.createStream(columnId,
-            OrcProto.Stream.Kind.LENGTH), false);
+      lengths = createIntegerWriter(writer.createStream(columnId,
+          OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
       recordPosition(rowIndexPosition);
     }
 
     @Override
+    OrcProto.ColumnEncoding getEncoding() {
+      if (isDirectV2) {
+        return OrcProto.ColumnEncoding.newBuilder()
+            .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+      }
+      return OrcProto.ColumnEncoding.newBuilder()
+          .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+    }
+
+    @Override
     void write(Object obj) throws IOException {
       super.write(obj);
       if (obj != null) {
@@ -1346,8 +1456,7 @@ class WriterImpl implements Writer, Memo
 
   private static TreeWriter createTreeWriter(ObjectInspector inspector,
                                              StreamFactory streamFactory,
-                                             boolean nullable
-                                            ) throws IOException {
+                                             boolean nullable) throws IOException {
     switch (inspector.getCategory()) {
       case PRIMITIVE:
         switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) {

Modified: hive/trunk/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto?rev=1513155&r1=1513154&r2=1513155&view=diff
==============================================================================
--- hive/trunk/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto (original)
+++ hive/trunk/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto Mon Aug 12 15:03:30 2013
@@ -73,6 +73,8 @@ message ColumnEncoding {
   enum Kind {
     DIRECT = 0;
     DICTIONARY = 1;
+    DIRECT_V2 = 2;
+    DICTIONARY_V2 = 3;
   }
   required Kind kind = 1;
   optional uint32 dictionarySize = 2;

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java?rev=1513155&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java Mon Aug 12 15:03:30 2013
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Random;
+
+import org.junit.Test;
+
+import com.google.common.primitives.Longs;
+
+public class TestBitPack {
+
+  private static final int SIZE = 100;
+  private static Random rand = new Random(100);
+
+  private long[] deltaEncode(long[] inp) {
+    long[] output = new long[inp.length];
+    for(int i = 0; i < inp.length; i++) {
+      output[i] = SerializationUtils.zigzagEncode(inp[i]);
+    }
+    return output;
+  }
+
+  private long nextLong(Random rng, long n) {
+    long bits, val;
+    do {
+      bits = (rng.nextLong() << 1) >>> 1;
+      val = bits % n;
+    } while (bits - val + (n - 1) < 0L);
+    return val;
+  }
+
+  private void runTest(int numBits) throws IOException {
+    long[] inp = new long[SIZE];
+    for(int i = 0; i < SIZE; i++) {
+      long val = 0;
+      if (numBits <= 32) {
+        if (numBits == 1) {
+          val = -1 * rand.nextInt(2);
+        } else {
+          val = rand.nextInt((int) Math.pow(2, numBits - 1));
+        }
+      } else {
+        val = nextLong(rand, (long) Math.pow(2, numBits - 2));
+      }
+      if (val % 2 == 0) {
+        val = -val;
+      }
+      inp[i] = val;
+    }
+    long[] deltaEncoded = deltaEncode(inp);
+    long minInput = Collections.min(Longs.asList(deltaEncoded));
+    long maxInput = Collections.max(Longs.asList(deltaEncoded));
+    long rangeInput = maxInput - minInput;
+    int fixedWidth = SerializationUtils.findClosestNumBits(rangeInput);
+    TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+    OutStream output = new OutStream("test", SIZE, null, collect);
+    SerializationUtils.writeInts(deltaEncoded, 0, deltaEncoded.length,
+        fixedWidth, output);
+    output.flush();
+    ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+    collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+    inBuf.flip();
+    long[] buff = new long[SIZE];
+    SerializationUtils.readInts(buff, 0, SIZE, fixedWidth,
+        InStream.create("test", inBuf, null, SIZE));
+    for(int i = 0; i < SIZE; i++) {
+      buff[i] = SerializationUtils.zigzagDecode(buff[i]);
+    }
+    assertEquals(numBits, fixedWidth);
+    assertArrayEquals(inp, buff);
+  }
+
+  @Test
+  public void test01BitPacking1Bit() throws IOException {
+    runTest(1);
+  }
+
+  @Test
+  public void test02BitPacking2Bit() throws IOException {
+    runTest(2);
+  }
+
+  @Test
+  public void test03BitPacking3Bit() throws IOException {
+    runTest(3);
+  }
+
+  @Test
+  public void test04BitPacking4Bit() throws IOException {
+    runTest(4);
+  }
+
+  @Test
+  public void test05BitPacking5Bit() throws IOException {
+    runTest(5);
+  }
+
+  @Test
+  public void test06BitPacking6Bit() throws IOException {
+    runTest(6);
+  }
+
+  @Test
+  public void test07BitPacking7Bit() throws IOException {
+    runTest(7);
+  }
+
+  @Test
+  public void test08BitPacking8Bit() throws IOException {
+    runTest(8);
+  }
+
+  @Test
+  public void test09BitPacking9Bit() throws IOException {
+    runTest(9);
+  }
+
+  @Test
+  public void test10BitPacking10Bit() throws IOException {
+    runTest(10);
+  }
+
+  @Test
+  public void test11BitPacking11Bit() throws IOException {
+    runTest(11);
+  }
+
+  @Test
+  public void test12BitPacking12Bit() throws IOException {
+    runTest(12);
+  }
+
+  @Test
+  public void test13BitPacking13Bit() throws IOException {
+    runTest(13);
+  }
+
+  @Test
+  public void test14BitPacking14Bit() throws IOException {
+    runTest(14);
+  }
+
+  @Test
+  public void test15BitPacking15Bit() throws IOException {
+    runTest(15);
+  }
+
+  @Test
+  public void test16BitPacking16Bit() throws IOException {
+    runTest(16);
+  }
+
+  @Test
+  public void test17BitPacking17Bit() throws IOException {
+    runTest(17);
+  }
+
+  @Test
+  public void test18BitPacking18Bit() throws IOException {
+    runTest(18);
+  }
+
+  @Test
+  public void test19BitPacking19Bit() throws IOException {
+    runTest(19);
+  }
+
+  @Test
+  public void test20BitPacking20Bit() throws IOException {
+    runTest(20);
+  }
+
+  @Test
+  public void test21BitPacking21Bit() throws IOException {
+    runTest(21);
+  }
+
+  @Test
+  public void test22BitPacking22Bit() throws IOException {
+    runTest(22);
+  }
+
+  @Test
+  public void test23BitPacking23Bit() throws IOException {
+    runTest(23);
+  }
+
+  @Test
+  public void test24BitPacking24Bit() throws IOException {
+    runTest(24);
+  }
+
+  @Test
+  public void test26BitPacking26Bit() throws IOException {
+    runTest(26);
+  }
+
+  @Test
+  public void test28BitPacking28Bit() throws IOException {
+    runTest(28);
+  }
+
+  @Test
+  public void test30BitPacking30Bit() throws IOException {
+    runTest(30);
+  }
+
+  @Test
+  public void test32BitPacking32Bit() throws IOException {
+    runTest(32);
+  }
+
+  @Test
+  public void test40BitPacking40Bit() throws IOException {
+    runTest(40);
+  }
+
+  @Test
+  public void test48BitPacking48Bit() throws IOException {
+    runTest(48);
+  }
+
+  @Test
+  public void test56BitPacking56Bit() throws IOException {
+    runTest(56);
+  }
+
+  @Test
+  public void test64BitPacking64Bit() throws IOException {
+    runTest(64);
+  }
+}

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java?rev=1513155&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java Mon Aug 12 15:03:30 2013
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.junit.Test;
+
+public class TestIntegerCompressionReader {
+
+  public void runSeekTest(CompressionCodec codec) throws Exception {
+    TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+    RunLengthIntegerWriterV2 out = new RunLengthIntegerWriterV2(
+        new OutStream("test", 1000, codec, collect), true);
+    TestInStream.PositionCollector[] positions =
+        new TestInStream.PositionCollector[4096];
+    Random random = new Random(99);
+    int[] junk = new int[2048];
+    for(int i=0; i < junk.length; ++i) {
+      junk[i] = random.nextInt();
+    }
+    for(int i=0; i < 4096; ++i) {
+      positions[i] = new TestInStream.PositionCollector();
+      out.getPosition(positions[i]);
+      // test runs, incrementing runs, non-runs
+      if (i < 1024) {
+        out.write(i/4);
+      } else if (i < 2048) {
+        out.write(2*i);
+      } else {
+        out.write(junk[i-2048]);
+      }
+    }
+    out.flush();
+    ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+    collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+    inBuf.flip();
+    RunLengthIntegerReaderV2 in = new RunLengthIntegerReaderV2(InStream.create
+        ("test", inBuf, codec, 1000), true);
+    for(int i=0; i < 2048; ++i) {
+      int x = (int) in.next();
+      if (i < 1024) {
+        assertEquals(i/4, x);
+      } else if (i < 2048) {
+        assertEquals(2*i, x);
+      } else {
+        assertEquals(junk[i-2048], x);
+      }
+    }
+    for(int i=2047; i >= 0; --i) {
+      in.seek(positions[i]);
+      int x = (int) in.next();
+      if (i < 1024) {
+        assertEquals(i/4, x);
+      } else if (i < 2048) {
+        assertEquals(2*i, x);
+      } else {
+        assertEquals(junk[i-2048], x);
+      }
+    }
+  }
+
+  @Test
+  public void testUncompressedSeek() throws Exception {
+    runSeekTest(null);
+  }
+
+  @Test
+  public void testCompressedSeek() throws Exception {
+    runSeekTest(new ZlibCodec());
+  }
+
+  @Test
+  public void testSkips() throws Exception {
+    TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+    RunLengthIntegerWriterV2 out = new RunLengthIntegerWriterV2(
+        new OutStream("test", 100, null, collect), true);
+    for(int i=0; i < 2048; ++i) {
+      if (i < 1024) {
+        out.write(i);
+      } else {
+        out.write(256 * i);
+      }
+    }
+    out.flush();
+    ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+    collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+    inBuf.flip();
+    RunLengthIntegerReaderV2 in = new RunLengthIntegerReaderV2(InStream.create
+        ("test", inBuf, null, 100), true);
+    for(int i=0; i < 2048; i += 10) {
+      int x = (int) in.next();
+      if (i < 1024) {
+        assertEquals(i, x);
+      } else {
+        assertEquals(256 * i, x);
+      }
+      if (i < 2038) {
+        in.skip(9);
+      }
+      in.skip(0);
+    }
+  }
+}