You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2013/08/12 17:03:31 UTC
svn commit: r1513155 [2/3] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/
ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/
ql/src/java/org/apache/hadoop/hive/ql/io/orc/
ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/ ql/...
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java?rev=1513155&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java Mon Aug 12 15:03:30 2013
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType;
+
+/**
+ * A reader that reads a sequence of light weight compressed integers. Refer
+ * {@link RunLengthIntegerWriterV2} for description of various lightweight
+ * compression techniques.
+ */
+class RunLengthIntegerReaderV2 implements IntegerReader {
+ private final InStream input;
+ private final boolean signed;
+ private final long[] literals = new long[RunLengthIntegerWriterV2.MAX_SCOPE];
+ private int numLiterals = 0;
+ private int used = 0;
+
+ RunLengthIntegerReaderV2(InStream input, boolean signed) throws IOException {
+ this.input = input;
+ this.signed = signed;
+ }
+
+ private void readValues() throws IOException {
+ // read the first 2 bits and determine the encoding type
+ int firstByte = input.read();
+ if (firstByte < 0) {
+ throw new EOFException("Read past end of RLE integer from " + input);
+ } else {
+ int enc = (firstByte >>> 6) & 0x03;
+ if (EncodingType.SHORT_REPEAT.ordinal() == enc) {
+ readShortRepeatValues(firstByte);
+ } else if (EncodingType.DIRECT.ordinal() == enc) {
+ readDirectValues(firstByte);
+ } else if (EncodingType.PATCHED_BASE.ordinal() == enc) {
+ readPatchedBaseValues(firstByte);
+ } else {
+ readDeltaValues(firstByte);
+ }
+ }
+ }
+
+ private void readDeltaValues(int firstByte) throws IOException {
+
+ // extract the number of fixed bits
+ int fb = (firstByte >>> 1) & 0x1f;
+ if (fb != 0) {
+ fb = SerializationUtils.decodeBitWidth(fb);
+ }
+
+ // extract the blob run length
+ int len = (firstByte & 0x01) << 8;
+ len |= input.read();
+
+ // read the first value stored as vint
+ long firstVal = 0;
+ if (signed) {
+ firstVal = SerializationUtils.readVslong(input);
+ } else {
+ firstVal = SerializationUtils.readVulong(input);
+ }
+
+ // store first value to result buffer
+ long prevVal = firstVal;
+ literals[numLiterals++] = firstVal;
+
+ // if fixed bits is 0 then all values have fixed delta
+ if (fb == 0) {
+ // read the fixed delta value stored as vint (deltas can be negative even
+ // if all number are positive)
+ long fd = SerializationUtils.readVslong(input);
+
+ // add fixed deltas to adjacent values
+ for(int i = 0; i < len; i++) {
+ literals[numLiterals++] = literals[numLiterals - 2] + fd;
+ }
+ } else {
+ long deltaBase = SerializationUtils.readVslong(input);
+ // add delta base and first value
+ literals[numLiterals++] = firstVal + deltaBase;
+ prevVal = literals[numLiterals - 1];
+ len -= 1;
+
+ // write the unpacked values, add it to previous value and store final
+ // value to result buffer. if the delta base value is negative then it
+ // is a decreasing sequence else an increasing sequence
+ SerializationUtils.readInts(literals, numLiterals, len, fb, input);
+ while (len > 0) {
+ if (deltaBase < 0) {
+ literals[numLiterals] = prevVal - literals[numLiterals];
+ } else {
+ literals[numLiterals] = prevVal + literals[numLiterals];
+ }
+ prevVal = literals[numLiterals];
+ len--;
+ numLiterals++;
+ }
+ }
+ }
+
+ private void readPatchedBaseValues(int firstByte) throws IOException {
+
+ // extract the number of fixed bits
+ int fbo = (firstByte >>> 1) & 0x1f;
+ int fb = SerializationUtils.decodeBitWidth(fbo);
+
+ // extract the run length of data blob
+ int len = (firstByte & 0x01) << 8;
+ len |= input.read();
+ // runs are always one off
+ len += 1;
+
+ // extract the number of bytes occupied by base
+ int thirdByte = input.read();
+ int bw = (thirdByte >>> 5) & 0x07;
+ // base width is one off
+ bw += 1;
+
+ // extract patch width
+ int pwo = thirdByte & 0x1f;
+ int pw = SerializationUtils.decodeBitWidth(pwo);
+
+ // read fourth byte and extract patch gap width
+ int fourthByte = input.read();
+ int pgw = (fourthByte >>> 5) & 0x07;
+ // patch gap width is one off
+ pgw += 1;
+
+ // extract the length of the patch list
+ int pl = fourthByte & 0x1f;
+
+ // read the next base width number of bytes to extract base value
+ long base = SerializationUtils.bytesToLongBE(input, bw);
+ long mask = (1L << ((bw * 8) - 1));
+ // if MSB of base value is 1 then base is negative value else positive
+ if ((base & mask) != 0) {
+ base = base & ~mask;
+ base = -base;
+ }
+
+ // unpack the data blob
+ long[] unpacked = new long[len];
+ SerializationUtils.readInts(unpacked, 0, len, fb, input);
+
+ // unpack the patch blob
+ long[] unpackedPatch = new long[pl];
+ SerializationUtils.readInts(unpackedPatch, 0, pl, pw + pgw, input);
+
+ // apply the patch directly when decoding the packed data
+ int patchIdx = 0;
+ long currGap = 0;
+ long currPatch = 0;
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ long actualGap = 0;
+
+ // special case: gap is >255 then patch value will be 0.
+ // if gap is <=255 then patch value cannot be 0
+ while (currGap == 255 && currPatch == 0) {
+ actualGap += 255;
+ patchIdx++;
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ }
+ // add the left over gap
+ actualGap += currGap;
+
+ // unpack data blob, patch it (if required), add base to get final result
+ for(int i = 0; i < unpacked.length; i++) {
+ if (i == actualGap) {
+ // extract the patch value
+ long patchedVal = unpacked[i] | (currPatch << fb);
+
+ // add base to patched value
+ literals[numLiterals++] = base + patchedVal;
+
+ // increment the patch to point to next entry in patch list
+ patchIdx++;
+
+ if (patchIdx < pl) {
+ // read the next gap and patch
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ actualGap = 0;
+
+ // special case: gap is >255 then patch will be 0. if gap is
+ // <=255 then patch cannot be 0
+ while (currGap == 255 && currPatch == 0) {
+ actualGap += 255;
+ patchIdx++;
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & ((1 << pw) - 1);
+ }
+ // add the left over gap
+ actualGap += currGap;
+
+ // next gap is relative to the current gap
+ actualGap += i;
+ }
+ } else {
+ // no patching required. add base to unpacked value to get final value
+ literals[numLiterals++] = base + unpacked[i];
+ }
+ }
+
+ }
+
+ private void readDirectValues(int firstByte) throws IOException {
+
+ // extract the number of fixed bits
+ int fbo = (firstByte >>> 1) & 0x1f;
+ int fb = SerializationUtils.decodeBitWidth(fbo);
+
+ // extract the run length
+ int len = (firstByte & 0x01) << 8;
+ len |= input.read();
+ // runs are one off
+ len += 1;
+
+ // write the unpacked values and zigzag decode to result buffer
+ SerializationUtils.readInts(literals, numLiterals, len, fb, input);
+ if (signed) {
+ for(int i = 0; i < len; i++) {
+ literals[numLiterals] = SerializationUtils
+ .zigzagDecode(literals[numLiterals]);
+ numLiterals++;
+ }
+ } else {
+ numLiterals += len;
+ }
+ }
+
+ private void readShortRepeatValues(int firstByte) throws IOException {
+
+ // read the number of bytes occupied by the value
+ int size = (firstByte >>> 3) & 0x07;
+ // #bytes are one off
+ size += 1;
+
+ // read the run length
+ int len = firstByte & 0x07;
+ // run lengths values are stored only after MIN_REPEAT value is met
+ len += RunLengthIntegerWriterV2.MIN_REPEAT;
+
+ // read the repeated value which is store using fixed bytes
+ long val = SerializationUtils.bytesToLongBE(input, size);
+
+ if (signed) {
+ val = SerializationUtils.zigzagDecode(val);
+ }
+
+ // repeat the value for length times
+ for(int i = 0; i < len; i++) {
+ literals[numLiterals++] = val;
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return used != numLiterals || input.available() > 0;
+ }
+
+ @Override
+ public long next() throws IOException {
+ long result;
+ if (used == numLiterals) {
+ numLiterals = 0;
+ used = 0;
+ readValues();
+ }
+ result = literals[used++];
+ return result;
+ }
+
+ @Override
+ public void seek(PositionProvider index) throws IOException {
+ input.seek(index);
+ int consumed = (int) index.getNext();
+ if (consumed != 0) {
+ // a loop is required for cases where we break the run into two
+ // parts
+ while (consumed > 0) {
+ numLiterals = 0;
+ readValues();
+ used = consumed;
+ consumed -= numLiterals;
+ }
+ } else {
+ used = 0;
+ numLiterals = 0;
+ }
+ }
+
+ @Override
+ public void skip(long numValues) throws IOException {
+ while (numValues > 0) {
+ if (used == numLiterals) {
+ numLiterals = 0;
+ used = 0;
+ readValues();
+ }
+ long consume = Math.min(numValues, numLiterals - used);
+ used += consume;
+ numValues -= consume;
+ }
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java?rev=1513155&r1=1513154&r2=1513155&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java Mon Aug 12 15:03:30 2013
@@ -25,7 +25,7 @@ import java.io.IOException;
* repetition is offset by a delta. If the control byte is -1 to -128, 1 to 128
* literal vint values follow.
*/
-class RunLengthIntegerWriter {
+class RunLengthIntegerWriter implements IntegerWriter {
static final int MIN_REPEAT_SIZE = 3;
static final int MAX_DELTA = 127;
static final int MIN_DELTA = -128;
@@ -71,12 +71,14 @@ class RunLengthIntegerWriter {
}
}
- void flush() throws IOException {
+ @Override
+ public void flush() throws IOException {
writeValues();
output.flush();
}
- void write(long value) throws IOException {
+ @Override
+ public void write(long value) throws IOException {
if (numLiterals == 0) {
literals[numLiterals++] = value;
tailRunLength = 1;
@@ -130,8 +132,10 @@ class RunLengthIntegerWriter {
}
}
- void getPosition(PositionRecorder recorder) throws IOException {
+ @Override
+ public void getPosition(PositionRecorder recorder) throws IOException {
output.getPosition(recorder);
recorder.addPosition(numLiterals);
}
+
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java?rev=1513155&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java Mon Aug 12 15:03:30 2013
@@ -0,0 +1,817 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+
+/**
+ * A writer that performs light weight compression over sequence of integers.
+ * <p>
+ * There are four types of lightweight integer compression
+ * <ul>
+ * <li>SHORT_REPEAT</li>
+ * <li>DIRECT</li>
+ * <li>PATCHED_BASE</li>
+ * <li>DELTA</li>
+ * </ul>
+ * </p>
+ * The description and format for these types are as below:
+ * <p>
+ * <b>SHORT_REPEAT:</b> Used for short repeated integer sequences.
+ * <ul>
+ * <li>1 byte header
+ * <ul>
+ * <li>2 bits for encoding type</li>
+ * <li>3 bits for bytes required for repeating value</li>
+ * <li>3 bits for repeat count (MIN_REPEAT + run length)</li>
+ * </ul>
+ * </li>
+ * <li>Blob - repeat value (fixed bytes)</li>
+ * </ul>
+ * </p>
+ * <p>
+ * <b>DIRECT:</b> Used for random integer sequences whose number of bit
+ * requirement doesn't vary a lot.
+ * <ul>
+ * <li>2 bytes header
+ * <ul>
+ * 1st byte
+ * <li>2 bits for encoding type</li>
+ * <li>5 bits for fixed bit width of values in blob</li>
+ * <li>1 bit for storing MSB of run length</li>
+ * </ul>
+ * <ul>
+ * 2nd byte
+ * <li>8 bits for lower run length bits</li>
+ * </ul>
+ * </li>
+ * <li>Blob - stores the direct values using fixed bit width. The length of the
+ * data blob is (fixed width * run length) bits long</li>
+ * </ul>
+ * </p>
+ * <p>
+ * <b>PATCHED_BASE:</b> Used for random integer sequences whose number of bit
+ * requirement varies beyond a threshold.
+ * <ul>
+ * <li>4 bytes header
+ * <ul>
+ * 1st byte
+ * <li>2 bits for encoding type</li>
+ * <li>5 bits for fixed bit width of values in blob</li>
+ * <li>1 bit for storing MSB of run length</li>
+ * </ul>
+ * <ul>
+ * 2nd byte
+ * <li>8 bits for lower run length bits</li>
+ * </ul>
+ * <ul>
+ * 3rd byte
+ * <li>3 bits for bytes required to encode base value</li>
+ * <li>5 bits for patch width</li>
+ * </ul>
+ * <ul>
+ * 4th byte
+ * <li>3 bits for patch gap width</li>
+ * <li>5 bits for patch length</li>
+ * </ul>
+ * </li>
+ * <li>Base value - Stored using fixed number of bytes. If MSB is set, base
+ * value is negative else positive. Length of base value is (base width * 8)
+ * bits.</li>
+ * <li>Data blob - Base reduced values as stored using fixed bit width. Length
+ * of data blob is (fixed width * run length) bits.</li>
+ * <li>Patch blob - Patch blob is a list of gap and patch value. Each entry in
+ * the patch list is (patch width + patch gap width) bits long. Gap between the
+ * subsequent elements to be patched are stored in upper part of entry whereas
+ * patch values are stored in lower part of entry. Length of patch blob is
+ * ((patch width + patch gap width) * patch length) bits.</li>
+ * </ul>
+ * </p>
+ * <p>
+ * <b>DELTA</b> Used for monotonically increasing or decreasing sequences,
+ * sequences with fixed delta values or long repeated sequences.
+ * <ul>
+ * <li>2 bytes header
+ * <ul>
+ * 1st byte
+ * <li>2 bits for encoding type</li>
+ * <li>5 bits for fixed bit width of values in blob</li>
+ * <li>1 bit for storing MSB of run length</li>
+ * </ul>
+ * <ul>
+ * 2nd byte
+ * <li>8 bits for lower run length bits</li>
+ * </ul>
+ * </li>
+ * <li>Base value - encoded as varint</li>
+ * <li>Delta base - encoded as varint</li>
+ * <li>Delta blob - only positive values. monotonicity and orderness are decided
+ * based on the sign of the base value and delta base</li>
+ * </ul>
+ * </p>
+ */
+class RunLengthIntegerWriterV2 implements IntegerWriter {
+
+ public enum EncodingType {
+ SHORT_REPEAT, DIRECT, PATCHED_BASE, DELTA
+ }
+
+ static final int MAX_SCOPE = 512;
+ static final int MIN_REPEAT = 3;
+ private static final int MAX_SHORT_REPEAT_LENGTH = 10;
+ private long prevDelta = 0;
+ private int fixedRunLength = 0;
+ private int variableRunLength = 0;
+ private final long[] literals = new long[MAX_SCOPE];
+ private final PositionedOutputStream output;
+ private final boolean signed;
+ private EncodingType encoding;
+ private int numLiterals;
+ private long[] zigzagLiterals;
+ private long[] baseRedLiterals;
+ private long[] adjDeltas;
+ private long fixedDelta;
+ private int zzBits90p;
+ private int zzBits100p;
+ private int brBits95p;
+ private int brBits100p;
+ private int bitsDeltaMax;
+ private int patchWidth;
+ private int patchGapWidth;
+ private int patchLength;
+ private long[] gapVsPatchList;
+ private long min;
+ private boolean isFixedDelta;
+
+ RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed) {
+ this.output = output;
+ this.signed = signed;
+ clear();
+ }
+
+ private void writeValues() throws IOException {
+ if (numLiterals != 0) {
+
+ if (encoding.equals(EncodingType.SHORT_REPEAT)) {
+ writeShortRepeatValues();
+ } else if (encoding.equals(EncodingType.DIRECT)) {
+ writeDirectValues();
+ } else if (encoding.equals(EncodingType.PATCHED_BASE)) {
+ writePatchedBaseValues();
+ } else {
+ writeDeltaValues();
+ }
+
+ // clear all the variables
+ clear();
+ }
+ }
+
+ private void writeDeltaValues() throws IOException {
+ int len = 0;
+ int fb = bitsDeltaMax;
+ int efb = 0;
+
+ if (isFixedDelta) {
+ // if fixed run length is greater than threshold then it will be fixed
+ // delta sequence with delta value 0 else fixed delta sequence with
+ // non-zero delta value
+ if (fixedRunLength > MIN_REPEAT) {
+ // ex. sequence: 2 2 2 2 2 2 2 2
+ len = fixedRunLength - 1;
+ fixedRunLength = 0;
+ } else {
+ // ex. sequence: 4 6 8 10 12 14 16
+ len = variableRunLength - 1;
+ variableRunLength = 0;
+ }
+ } else {
+ // fixed width 0 is used for long repeating values.
+ // sequences that require only 1 bit to encode will have an additional bit
+ if (fb == 1) {
+ fb = 2;
+ }
+ efb = SerializationUtils.encodeBitWidth(fb);
+ efb = efb << 1;
+ len = variableRunLength - 1;
+ variableRunLength = 0;
+ }
+
+ // extract the 9th bit of run length
+ int tailBits = (len & 0x100) >>> 8;
+
+ // create first byte of the header
+ int headerFirstByte = getOpcode() | efb | tailBits;
+
+ // second byte of the header stores the remaining 8 bits of runlength
+ int headerSecondByte = len & 0xff;
+
+ // write header
+ output.write(headerFirstByte);
+ output.write(headerSecondByte);
+
+ // store the first value from zigzag literal array
+ if (signed) {
+ SerializationUtils.writeVslong(output, literals[0]);
+ } else {
+ SerializationUtils.writeVulong(output, literals[0]);
+ }
+
+ if (isFixedDelta) {
+ // if delta is fixed then we don't need to store delta blob
+ SerializationUtils.writeVslong(output, fixedDelta);
+ } else {
+ // store the first value as delta value using zigzag encoding
+ SerializationUtils.writeVslong(output, adjDeltas[0]);
+ // adjacent delta values are bit packed
+ SerializationUtils.writeInts(adjDeltas, 1, adjDeltas.length - 1, fb,
+ output);
+ }
+ }
+
+ private void writePatchedBaseValues() throws IOException {
+
+ // write the number of fixed bits required in next 5 bits
+ int fb = brBits95p;
+ int efb = SerializationUtils.encodeBitWidth(fb) << 1;
+
+ // adjust variable run length, they are one off
+ variableRunLength -= 1;
+
+ // extract the 9th bit of run length
+ int tailBits = (variableRunLength & 0x100) >>> 8;
+
+ // create first byte of the header
+ int headerFirstByte = getOpcode() | efb | tailBits;
+
+ // second byte of the header stores the remaining 8 bits of runlength
+ int headerSecondByte = variableRunLength & 0xff;
+
+ // if the min value is negative toggle the sign
+ boolean isNegative = min < 0 ? true : false;
+ if (isNegative) {
+ min = -min;
+ }
+
+ // find the number of bytes required for base and shift it by 5 bits
+ // to accommodate patch width. The additional bit is used to store the sign
+ // of the base value.
+ int baseWidth = SerializationUtils.findClosestNumBits(min) + 1;
+ int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1;
+ int bb = (baseBytes - 1) << 5;
+
+ // if the base value is negative then set MSB to 1
+ if (isNegative) {
+ min |= (1L << ((baseBytes * 8) - 1));
+ }
+
+ // third byte contains 3 bits for number of bytes occupied by base
+ // and 5 bits for patchWidth
+ int headerThirdByte = bb | SerializationUtils.encodeBitWidth(patchWidth);
+
+ // fourth byte contains 3 bits for page gap width and 5 bits for
+ // patch length
+ int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength;
+
+ // write header
+ output.write(headerFirstByte);
+ output.write(headerSecondByte);
+ output.write(headerThirdByte);
+ output.write(headerFourthByte);
+
+ // write the base value using fixed bytes in big endian order
+ for(int i = baseBytes - 1; i >= 0; i--) {
+ byte b = (byte) ((min >>> (i * 8)) & 0xff);
+ output.write(b);
+ }
+
+ // base reduced literals are bit packed
+ int closestFixedBits = SerializationUtils.getClosestFixedBits(brBits95p);
+ SerializationUtils.writeInts(baseRedLiterals, 0, baseRedLiterals.length,
+ closestFixedBits, output);
+
+ // write patch list
+ closestFixedBits = SerializationUtils.getClosestFixedBits(patchGapWidth
+ + patchWidth);
+ SerializationUtils.writeInts(gapVsPatchList, 0, gapVsPatchList.length,
+ closestFixedBits, output);
+
+ // reset run length
+ variableRunLength = 0;
+ }
+
+ /**
+ * Store the opcode in 2 MSB bits
+ * @return opcode
+ */
+ private int getOpcode() {
+ return encoding.ordinal() << 6;
+ }
+
+ private void writeDirectValues() throws IOException {
+
+ // write the number of fixed bits required in next 5 bits
+ int efb = SerializationUtils.encodeBitWidth(zzBits100p) << 1;
+
+ // adjust variable run length
+ variableRunLength -= 1;
+
+ // extract the 9th bit of run length
+ int tailBits = (variableRunLength & 0x100) >>> 8;
+
+ // create first byte of the header
+ int headerFirstByte = getOpcode() | efb | tailBits;
+
+ // second byte of the header stores the remaining 8 bits of runlength
+ int headerSecondByte = variableRunLength & 0xff;
+
+ // write header
+ output.write(headerFirstByte);
+ output.write(headerSecondByte);
+
+ // bit packing the zigzag encoded literals
+ SerializationUtils.writeInts(zigzagLiterals, 0, zigzagLiterals.length,
+ zzBits100p, output);
+
+ // reset run length
+ variableRunLength = 0;
+ }
+
+ private void writeShortRepeatValues() throws IOException {
+ // get the value that is repeating, compute the bits and bytes required
+ long repeatVal = 0;
+ if (signed) {
+ repeatVal = SerializationUtils.zigzagEncode(literals[0]);
+ } else {
+ repeatVal = literals[0];
+ }
+
+ int numBitsRepeatVal = SerializationUtils.findClosestNumBits(repeatVal);
+ int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3
+ : (numBitsRepeatVal >>> 3) + 1;
+
+ // write encoding type in top 2 bits
+ int header = getOpcode();
+
+ // write the number of bytes required for the value
+ header |= ((numBytesRepeatVal - 1) << 3);
+
+ // write the run length
+ fixedRunLength -= MIN_REPEAT;
+ header |= fixedRunLength;
+
+ // write the header
+ output.write(header);
+
+ // write the repeating value in big endian byte order
+ for(int i = numBytesRepeatVal - 1; i >= 0; i--) {
+ int b = (int) ((repeatVal >>> (i * 8)) & 0xff);
+ output.write(b);
+ }
+
+ fixedRunLength = 0;
+ }
+
+ private void determineEncoding() {
+ // used for direct encoding
+ zigzagLiterals = new long[numLiterals];
+
+ // used for patched base encoding
+ baseRedLiterals = new long[numLiterals];
+
+ // used for delta encoding
+ adjDeltas = new long[numLiterals - 1];
+
+ int idx = 0;
+
+ // for identifying monotonic sequences
+ boolean isIncreasing = false;
+ int increasingCount = 1;
+ boolean isDecreasing = false;
+ int decreasingCount = 1;
+
+ // for identifying type of delta encoding
+ min = literals[0];
+ long max = literals[0];
+ isFixedDelta = true;
+ long currDelta = 0;
+
+ min = literals[0];
+ long deltaMax = 0;
+
+ // populate all variables to identify the encoding type
+ if (numLiterals >= 1) {
+ currDelta = literals[1] - literals[0];
+ for(int i = 0; i < numLiterals; i++) {
+ if (i > 0 && literals[i] >= max) {
+ max = literals[i];
+ increasingCount++;
+ }
+
+ if (i > 0 && literals[i] <= min) {
+ min = literals[i];
+ decreasingCount++;
+ }
+
+ // if delta doesn't changes then mark it as fixed delta
+ if (i > 0 && isFixedDelta) {
+ if (literals[i] - literals[i - 1] != currDelta) {
+ isFixedDelta = false;
+ }
+
+ fixedDelta = currDelta;
+ }
+
+ // populate zigzag encoded literals
+ long zzEncVal = 0;
+ if (signed) {
+ zzEncVal = SerializationUtils.zigzagEncode(literals[i]);
+ } else {
+ zzEncVal = literals[i];
+ }
+ zigzagLiterals[idx] = zzEncVal;
+ idx++;
+
+ // max delta value is required for computing the fixed bits
+ // required for delta blob in delta encoding
+ if (i > 0) {
+ if (i == 1) {
+ // first value preserve the sign
+ adjDeltas[i - 1] = literals[i] - literals[i - 1];
+ } else {
+ adjDeltas[i - 1] = Math.abs(literals[i] - literals[i - 1]);
+ if (adjDeltas[i - 1] > deltaMax) {
+ deltaMax = adjDeltas[i - 1];
+ }
+ }
+ }
+ }
+
+ // stores the number of bits required for packing delta blob in
+ // delta encoding
+ bitsDeltaMax = SerializationUtils.findClosestNumBits(deltaMax);
+
+ // if decreasing count equals total number of literals then the
+ // sequence is monotonically decreasing
+ if (increasingCount == 1 && decreasingCount == numLiterals) {
+ isDecreasing = true;
+ }
+
+ // if increasing count equals total number of literals then the
+ // sequence is monotonically increasing
+ if (decreasingCount == 1 && increasingCount == numLiterals) {
+ isIncreasing = true;
+ }
+ }
+
+ // if the sequence is both increasing and decreasing then it is not
+ // monotonic
+ if (isDecreasing && isIncreasing) {
+ isDecreasing = false;
+ isIncreasing = false;
+ }
+
+ // fixed delta condition
+ if (isIncreasing == false && isDecreasing == false && isFixedDelta == true) {
+ encoding = EncodingType.DELTA;
+ return;
+ }
+
+ // monotonic condition
+ if (isIncreasing || isDecreasing) {
+ encoding = EncodingType.DELTA;
+ return;
+ }
+
+ // percentile values are computed for the zigzag encoded values. if the
+ // number of bit requirement between 90th and 100th percentile varies
+ // beyond a threshold then we need to patch the values. if the variation
+ // is not significant then we can use direct or delta encoding
+
+ double p = 0.9;
+ zzBits90p = SerializationUtils.percentileBits(zigzagLiterals, p);
+
+ p = 1.0;
+ zzBits100p = SerializationUtils.percentileBits(zigzagLiterals, p);
+
+ int diffBitsLH = zzBits100p - zzBits90p;
+
+ // if the difference between 90th percentile and 100th percentile fixed
+ // bits is > 1 then we need patch the values
+ if (isIncreasing == false && isDecreasing == false && diffBitsLH > 1
+ && isFixedDelta == false) {
+ // patching is done only on base reduced values.
+ // remove base from literals
+ for(int i = 0; i < zigzagLiterals.length; i++) {
+ baseRedLiterals[i] = literals[i] - min;
+ }
+
+ // 95th percentile width is used to determine max allowed value
+ // after which patching will be done
+ p = 0.95;
+ brBits95p = SerializationUtils.percentileBits(baseRedLiterals, p);
+
+ // 100th percentile is used to compute the max patch width
+ p = 1.0;
+ brBits100p = SerializationUtils.percentileBits(baseRedLiterals, p);
+
+ // after base reducing the values, if the difference in bits between
+ // 95th percentile and 100th percentile value is zero then there
+ // is no point in patching the values, in which case we will
+ // fallback to DIRECT encoding.
+ // The decision to use patched base was based on zigzag values, but the
+ // actual patching is done on base reduced literals.
+ if ((brBits100p - brBits95p) != 0) {
+ encoding = EncodingType.PATCHED_BASE;
+ preparePatchedBlob();
+ return;
+ } else {
+ encoding = EncodingType.DIRECT;
+ return;
+ }
+ }
+
+ // if difference in bits between 95th percentile and 100th percentile is
+ // 0, then patch length will become 0. Hence we will fallback to direct
+ if (isIncreasing == false && isDecreasing == false && diffBitsLH <= 1
+ && isFixedDelta == false) {
+ encoding = EncodingType.DIRECT;
+ return;
+ }
+
+ // this should not happen
+ if (encoding == null) {
+ throw new RuntimeException("Integer encoding cannot be determined.");
+ }
+ }
+
+ private void preparePatchedBlob() {
+ // mask will be max value beyond which patch will be generated
+ int mask = (1 << brBits95p) - 1;
+
+ // since we are considering only 95 percentile, the size of gap and
+ // patch array can contain only be 5% values
+ patchLength = (int) Math.ceil((baseRedLiterals.length * 0.05));
+ int[] gapList = new int[patchLength];
+ long[] patchList = new long[patchLength];
+
+ // #bit for patch
+ patchWidth = brBits100p - brBits95p;
+ patchWidth = SerializationUtils.getClosestFixedBits(patchWidth);
+
+ int gapIdx = 0;
+ int patchIdx = 0;
+ int prev = 0;
+ int gap = 0;
+ int maxGap = 0;
+
+ for(int i = 0; i < baseRedLiterals.length; i++) {
+ // if value is above mask then create the patch and record the gap
+ if (baseRedLiterals[i] > mask) {
+ gap = i - prev;
+ if (gap > maxGap) {
+ maxGap = gap;
+ }
+
+ // gaps are relative, so store the previous patched value index
+ prev = i;
+ gapList[gapIdx++] = gap;
+
+ // extract the most significant bits that are over mask bits
+ long patch = baseRedLiterals[i] >>> brBits95p;
+ patchList[patchIdx++] = patch;
+
+ // strip off the MSB to enable safe bit packing
+ baseRedLiterals[i] &= mask;
+ }
+ }
+
+ // adjust the patch length to number of entries in gap list
+ patchLength = gapIdx;
+
+ // if the element to be patched is the first and only element then
+ // max gap will be 0, but to store the gap as 0 we need atleast 1 bit
+ if (maxGap == 0 && patchLength != 0) {
+ patchGapWidth = 1;
+ } else {
+ patchGapWidth = SerializationUtils.findClosestNumBits(maxGap);
+ }
+
+ // special case: if the patch gap width is greater than 256, then
+ // we need 9 bits to encode the gap width. But we only have 3 bits in
+ // header to record the gap width. To deal with this case, we will save
+ // two entries in patch list in the following way
+ // 256 gap width => 0 for patch value
+ // actual gap - 256 => actual patch value
+ // We will do the same for gap width = 511. If the element to be patched is
+ // the last element in the scope then gap width will be 511. In this case we
+ // will have 3 entries in the patch list in the following way
+ // 255 gap width => 0 for patch value
+ // 255 gap width => 0 for patch value
+ // 1 gap width => actual patch value
+ if (patchGapWidth > 8) {
+ patchGapWidth = 8;
+ // for gap = 511, we need two additional entries in patch list
+ if (maxGap == 511) {
+ patchLength += 2;
+ } else {
+ patchLength += 1;
+ }
+ }
+
+ // create gap vs patch list
+ gapIdx = 0;
+ patchIdx = 0;
+ gapVsPatchList = new long[patchLength];
+ for(int i = 0; i < patchLength; i++) {
+ long g = gapList[gapIdx++];
+ long p = patchList[patchIdx++];
+ while (g > 255) {
+ gapVsPatchList[i++] = (255 << patchWidth) | 0;
+ g -= 255;
+ }
+
+ // store patch value in LSBs and gap in MSBs
+ gapVsPatchList[i] = (g << patchWidth) | p;
+ }
+ }
+
+ /**
+ * clears all the variables
+ */
+ private void clear() {
+ numLiterals = 0;
+ encoding = null;
+ prevDelta = 0;
+ zigzagLiterals = null;
+ baseRedLiterals = null;
+ adjDeltas = null;
+ fixedDelta = 0;
+ zzBits90p = 0;
+ zzBits100p = 0;
+ brBits95p = 0;
+ brBits100p = 0;
+ bitsDeltaMax = 0;
+ patchGapWidth = 0;
+ patchLength = 0;
+ patchWidth = 0;
+ gapVsPatchList = null;
+ min = 0;
+ isFixedDelta = false;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (numLiterals != 0) {
+ if (variableRunLength != 0) {
+ determineEncoding();
+ writeValues();
+ } else if (fixedRunLength != 0) {
+ if (fixedRunLength < MIN_REPEAT) {
+ variableRunLength = fixedRunLength;
+ fixedRunLength = 0;
+ determineEncoding();
+ writeValues();
+ } else if (fixedRunLength >= MIN_REPEAT
+ && fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) {
+ encoding = EncodingType.SHORT_REPEAT;
+ writeValues();
+ } else {
+ encoding = EncodingType.DELTA;
+ isFixedDelta = true;
+ writeValues();
+ }
+ }
+ }
+ output.flush();
+ }
+
+ @Override
+ public void write(long val) throws IOException {
+ if (numLiterals == 0) {
+ initializeLiterals(val);
+ } else {
+ if (numLiterals == 1) {
+ prevDelta = val - literals[0];
+ literals[numLiterals++] = val;
+ // if both values are same count as fixed run else variable run
+ if (val == literals[0]) {
+ fixedRunLength = 2;
+ variableRunLength = 0;
+ } else {
+ fixedRunLength = 0;
+ variableRunLength = 2;
+ }
+ } else {
+ long currentDelta = val - literals[numLiterals - 1];
+ if (prevDelta == 0 && currentDelta == 0) {
+ // fixed delta run
+
+ literals[numLiterals++] = val;
+
+ // if variable run is non-zero then we are seeing repeating
+ // values at the end of variable run in which case keep
+ // updating variable and fixed runs
+ if (variableRunLength > 0) {
+ fixedRunLength = 2;
+ }
+ fixedRunLength += 1;
+
+ // if fixed run met the minimum condition and if variable
+ // run is non-zero then flush the variable run and shift the
+ // tail fixed runs to start of the buffer
+ if (fixedRunLength >= MIN_REPEAT && variableRunLength > 0) {
+ numLiterals -= MIN_REPEAT;
+ variableRunLength -= MIN_REPEAT - 1;
+ // copy the tail fixed runs
+ long[] tailVals = new long[MIN_REPEAT];
+ System.arraycopy(literals, numLiterals, tailVals, 0, MIN_REPEAT);
+
+ // determine variable encoding and flush values
+ determineEncoding();
+ writeValues();
+
+ // shift tail fixed runs to beginning of the buffer
+ for(long l : tailVals) {
+ literals[numLiterals++] = l;
+ }
+ }
+
+ // if fixed runs reached max repeat length then write values
+ if (fixedRunLength == MAX_SCOPE) {
+ determineEncoding();
+ writeValues();
+ }
+ } else {
+ // variable delta run
+
+ // if fixed run length is non-zero and if it satisfies the
+ // short repeat conditions then write the values as short repeats
+ // else use delta encoding
+ if (fixedRunLength >= MIN_REPEAT) {
+ if (fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) {
+ encoding = EncodingType.SHORT_REPEAT;
+ writeValues();
+ } else {
+ encoding = EncodingType.DELTA;
+ isFixedDelta = true;
+ writeValues();
+ }
+ }
+
+ // if fixed run length is <MIN_REPEAT and current value is
+ // different from previous then treat it as variable run
+ if (fixedRunLength > 0 && fixedRunLength < MIN_REPEAT) {
+ if (val != literals[numLiterals - 1]) {
+ variableRunLength = fixedRunLength;
+ fixedRunLength = 0;
+ }
+ }
+
+ // after writing values re-initialize the variables
+ if (numLiterals == 0) {
+ initializeLiterals(val);
+ } else {
+ // keep updating variable run lengths
+ prevDelta = val - literals[numLiterals - 1];
+ literals[numLiterals++] = val;
+ variableRunLength += 1;
+
+ // if variable run length reach the max scope, write it
+ if (variableRunLength == MAX_SCOPE) {
+ determineEncoding();
+ writeValues();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void initializeLiterals(long val) {
+ literals[numLiterals++] = val;
+ fixedRunLength = 1;
+ variableRunLength = 1;
+ }
+
+ @Override
+ public void getPosition(PositionRecorder recorder) throws IOException {
+ output.getPosition(recorder);
+ recorder.addPosition(numLiterals);
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java?rev=1513155&r1=1513154&r2=1513155&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java Mon Aug 12 15:03:30 2013
@@ -185,4 +185,279 @@ final class SerializationUtils {
result = result.shiftRight(1);
return result;
}
+
+ enum FixedBitSizes {
+ ONE, TWO, THREE, FOUR, FIVE, SIX, SEVEN, EIGHT, NINE, TEN, ELEVEN, TWELVE,
+ THIRTEEN, FOURTEEN, FIFTEEN, SIXTEEN, SEVENTEEN, EIGHTEEN, NINETEEN,
+ TWENTY, TWENTYONE, TWENTYTWO, TWENTYTHREE, TWENTYFOUR, TWENTYSIX,
+ TWENTYEIGHT, THIRTY, THIRTYTWO, FORTY, FORTYEIGHT, FIFTYSIX, SIXTYFOUR;
+ }
+
+ /**
+ * Count the number of bits required to encode the given value
+ * @param value
+ * @return bits required to store value
+ */
+ static int findClosestNumBits(long value) {
+ int count = 0;
+ while (value > 0) {
+ count++;
+ value = value >>> 1;
+ }
+ return getClosestFixedBits(count);
+ }
+
+ /**
+ * zigzag encode the given value
+ * @param val
+ * @return zigzag encoded value
+ */
+ static long zigzagEncode(long val) {
+ return (val << 1) ^ (val >> 63);
+ }
+
+ /**
+ * zigzag decode the given value
+ * @param val
+ * @return zizag decoded value
+ */
+ static long zigzagDecode(long val) {
+ return (val >>> 1) ^ -(val & 1);
+ }
+
+ /**
+ * Compute the bits required to represent pth percentile value
+ * @param data - array
+ * @param p - percentile value (>=0.0 to <=1.0)
+ * @return pth percentile bits
+ */
+ static int percentileBits(long[] data, double p) {
+ if ((p > 1.0) || (p <= 0.0)) {
+ return -1;
+ }
+
+ // histogram that store the encoded bit requirement for each values.
+ // maximum number of bits that can encoded is 32 (refer FixedBitSizes)
+ int[] hist = new int[32];
+
+ // compute the histogram
+ for(long l : data) {
+ int idx = encodeBitWidth(findClosestNumBits(l));
+ hist[idx] += 1;
+ }
+
+ int len = data.length;
+ int perLen = (int) (len * (1.0 - p));
+
+ // return the bits required by pth percentile length
+ for(int i = hist.length - 1; i >= 0; i--) {
+ perLen -= hist[i];
+ if (perLen < 0) {
+ return decodeBitWidth(i);
+ }
+ }
+
+ return 0;
+ }
+
+ /**
+ * Read n bytes in big endian order and convert to long
+ * @param b - byte array
+ * @return long value
+ */
+ static long bytesToLongBE(InStream input, int n) throws IOException {
+ long out = 0;
+ long val = 0;
+ while (n > 0) {
+ n--;
+ // store it in a long and then shift else integer overflow will occur
+ val = input.read();
+ out |= (val << (n * 8));
+ }
+ return out;
+ }
+
+ /**
+ * Calculate the number of bytes required
+ * @param n - number of values
+ * @param numBits - bit width
+ * @return number of bytes required
+ */
+ static int getTotalBytesRequired(int n, int numBits) {
+ return (n * numBits + 7) / 8;
+ }
+
+ /**
+ * For a given fixed bit this function will return the closest available fixed
+ * bit
+ * @param n
+ * @return closest valid fixed bit
+ */
+ static int getClosestFixedBits(int n) {
+ if (n == 0) {
+ return 1;
+ }
+
+ if (n >= 1 && n <= 24) {
+ return n;
+ } else if (n > 24 && n <= 26) {
+ return 26;
+ } else if (n > 26 && n <= 28) {
+ return 28;
+ } else if (n > 28 && n <= 30) {
+ return 30;
+ } else if (n > 30 && n <= 32) {
+ return 32;
+ } else if (n > 32 && n <= 40) {
+ return 40;
+ } else if (n > 40 && n <= 48) {
+ return 48;
+ } else if (n > 48 && n <= 56) {
+ return 56;
+ } else {
+ return 64;
+ }
+ }
+
+ /**
+ * Finds the closest available fixed bit width match and returns its encoded
+ * value (ordinal)
+ * @param n - fixed bit width to encode
+ * @return encoded fixed bit width
+ */
+ static int encodeBitWidth(int n) {
+ n = getClosestFixedBits(n);
+
+ if (n >= 1 && n <= 24) {
+ return n - 1;
+ } else if (n > 24 && n <= 26) {
+ return FixedBitSizes.TWENTYSIX.ordinal();
+ } else if (n > 26 && n <= 28) {
+ return FixedBitSizes.TWENTYEIGHT.ordinal();
+ } else if (n > 28 && n <= 30) {
+ return FixedBitSizes.THIRTY.ordinal();
+ } else if (n > 30 && n <= 32) {
+ return FixedBitSizes.THIRTYTWO.ordinal();
+ } else if (n > 32 && n <= 40) {
+ return FixedBitSizes.FORTY.ordinal();
+ } else if (n > 40 && n <= 48) {
+ return FixedBitSizes.FORTYEIGHT.ordinal();
+ } else if (n > 48 && n <= 56) {
+ return FixedBitSizes.FIFTYSIX.ordinal();
+ } else {
+ return FixedBitSizes.SIXTYFOUR.ordinal();
+ }
+ }
+
+ /**
+ * Decodes the ordinal fixed bit value to actual fixed bit width value
+ * @param n - encoded fixed bit width
+ * @return decoded fixed bit width
+ */
+ static int decodeBitWidth(int n) {
+ if (n >= FixedBitSizes.ONE.ordinal()
+ && n <= FixedBitSizes.TWENTYFOUR.ordinal()) {
+ return n + 1;
+ } else if (n == FixedBitSizes.TWENTYSIX.ordinal()) {
+ return 26;
+ } else if (n == FixedBitSizes.TWENTYEIGHT.ordinal()) {
+ return 28;
+ } else if (n == FixedBitSizes.THIRTY.ordinal()) {
+ return 30;
+ } else if (n == FixedBitSizes.THIRTYTWO.ordinal()) {
+ return 32;
+ } else if (n == FixedBitSizes.FORTY.ordinal()) {
+ return 40;
+ } else if (n == FixedBitSizes.FORTYEIGHT.ordinal()) {
+ return 48;
+ } else if (n == FixedBitSizes.FIFTYSIX.ordinal()) {
+ return 56;
+ } else {
+ return 64;
+ }
+ }
+
+ /**
+ * Bitpack and write the input values to underlying output stream
+ * @param input - values to write
+ * @param offset - offset
+ * @param len - length
+ * @param bitSize - bit width
+ * @param output - output stream
+ * @throws IOException
+ */
+ static void writeInts(long[] input, int offset, int len, int bitSize,
+ OutputStream output) throws IOException {
+ if (input == null || input.length < 1 || offset < 0 || len < 1
+ || bitSize < 1) {
+ return;
+ }
+
+ int bitsLeft = 8;
+ byte current = 0;
+ for(int i = offset; i < (offset + len); i++) {
+ long value = input[i];
+ int bitsToWrite = bitSize;
+ while (bitsToWrite > bitsLeft) {
+ // add the bits to the bottom of the current word
+ current |= value >>> (bitsToWrite - bitsLeft);
+ // subtract out the bits we just added
+ bitsToWrite -= bitsLeft;
+ // zero out the bits above bitsToWrite
+ value &= (1L << bitsToWrite) - 1;
+ output.write(current);
+ current = 0;
+ bitsLeft = 8;
+ }
+ bitsLeft -= bitsToWrite;
+ current |= value << bitsLeft;
+ if (bitsLeft == 0) {
+ output.write(current);
+ current = 0;
+ bitsLeft = 8;
+ }
+ }
+
+ // flush
+ if (bitsLeft != 8) {
+ output.write(current);
+ current = 0;
+ bitsLeft = 8;
+ }
+ }
+
+ /**
+ * Read bitpacked integers from input stream
+ * @param buffer - input buffer
+ * @param offset - offset
+ * @param len - length
+ * @param bitSize - bit width
+ * @param input - input stream
+ * @throws IOException
+ */
+ static void readInts(long[] buffer, int offset, int len, int bitSize,
+ InStream input) throws IOException {
+ int bitsLeft = 0;
+ int current = 0;
+
+ for(int i = offset; i < (offset + len); i++) {
+ long result = 0;
+ int bitsLeftToRead = bitSize;
+ while (bitsLeftToRead > bitsLeft) {
+ result <<= bitsLeft;
+ result |= current & ((1 << bitsLeft) - 1);
+ bitsLeftToRead -= bitsLeft;
+ current = input.read();
+ bitsLeft = 8;
+ }
+
+ // handle the left over bits
+ if (bitsLeftToRead > 0) {
+ result <<= bitsLeftToRead;
+ bitsLeft -= bitsLeftToRead;
+ result |= (current >> bitsLeft) & ((1 << bitsLeftToRead) - 1);
+ }
+ buffer[i] = result;
+ }
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1513155&r1=1513154&r2=1513155&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Mon Aug 12 15:03:30 2013
@@ -381,6 +381,7 @@ class WriterImpl implements Writer, Memo
private final PositionedOutputStream rowIndexStream;
private boolean foundNulls;
private OutStream isPresentOutStream;
+ protected final boolean useDirectV2Encoding;
/**
* Create a tree writer.
@@ -396,6 +397,7 @@ class WriterImpl implements Writer, Memo
this.isCompressed = streamFactory.isCompressed();
this.id = columnId;
this.inspector = inspector;
+ this.useDirectV2Encoding = true;
if (nullable) {
isPresentOutStream = streamFactory.createStream(id,
OrcProto.Stream.Kind.PRESENT);
@@ -430,6 +432,32 @@ class WriterImpl implements Writer, Memo
return rowIndexEntry;
}
+ IntegerWriter createIntegerWriter(PositionedOutputStream output,
+ boolean signed, boolean isDirectV2) {
+ if (isDirectV2) {
+ return new RunLengthIntegerWriterV2(output, signed);
+ } else {
+ return new RunLengthIntegerWriter(output, signed);
+ }
+ }
+
+ boolean isNewWriteFormat(StreamFactory writer) {
+ String writeFormat = writer.getConfiguration().get(
+ HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.varname);
+ if (writeFormat == null) {
+ LOG.warn("ORC write format not defined. Using 0.12 ORC write format.");
+ return true;
+ }
+ if (writeFormat
+ .equals(HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.defaultVal)) {
+ LOG.info("Using 0.11 ORC write format.");
+ return false;
+ }
+
+ LOG.info("Using 0.12 ORC write format.");
+ return true;
+ }
+
/**
* Add a new value to the column.
* @param obj
@@ -635,10 +663,11 @@ class WriterImpl implements Writer, Memo
}
private static class IntegerTreeWriter extends TreeWriter {
- private final RunLengthIntegerWriter writer;
+ private final IntegerWriter writer;
private final ShortObjectInspector shortInspector;
private final IntObjectInspector intInspector;
private final LongObjectInspector longInspector;
+ private boolean isDirectV2 = true;
IntegerTreeWriter(int columnId,
ObjectInspector inspector,
@@ -647,7 +676,8 @@ class WriterImpl implements Writer, Memo
super(columnId, inspector, writer, nullable);
PositionedOutputStream out = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
- this.writer = new RunLengthIntegerWriter(out, true);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ this.writer = createIntegerWriter(out, true, isDirectV2);
if (inspector instanceof IntObjectInspector) {
intInspector = (IntObjectInspector) inspector;
shortInspector = null;
@@ -666,6 +696,16 @@ class WriterImpl implements Writer, Memo
}
@Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
void write(Object obj) throws IOException {
super.write(obj);
if (obj != null) {
@@ -776,13 +816,13 @@ class WriterImpl implements Writer, Memo
private static class StringTreeWriter extends TreeWriter {
private static final int INITIAL_DICTIONARY_SIZE = 4096;
private final OutStream stringOutput;
- private final RunLengthIntegerWriter lengthOutput;
- private final RunLengthIntegerWriter rowOutput;
+ private final IntegerWriter lengthOutput;
+ private final IntegerWriter rowOutput;
private final StringRedBlackTree dictionary =
new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
private final DynamicIntArray rows = new DynamicIntArray();
private final PositionedOutputStream directStreamOutput;
- private final RunLengthIntegerWriter directLengthOutput;
+ private final IntegerWriter directLengthOutput;
private final List<OrcProto.RowIndexEntry> savedRowIndex =
new ArrayList<OrcProto.RowIndexEntry>();
private final boolean buildIndex;
@@ -791,25 +831,26 @@ class WriterImpl implements Writer, Memo
//the total number of non-null rows, turn off dictionary encoding
private final float dictionaryKeySizeThreshold;
private boolean useDictionaryEncoding = true;
+ private boolean isDirectV2 = true;
StringTreeWriter(int columnId,
ObjectInspector inspector,
StreamFactory writer,
boolean nullable) throws IOException {
super(columnId, inspector, writer, nullable);
+ this.isDirectV2 = isNewWriteFormat(writer);
stringOutput = writer.createStream(id,
OrcProto.Stream.Kind.DICTIONARY_DATA);
- lengthOutput = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.LENGTH), false);
- rowOutput = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.DATA), false);
+ lengthOutput = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
+ rowOutput = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), false, isDirectV2);
recordPosition(rowIndexPosition);
rowIndexValueCount.add(0L);
buildIndex = writer.buildIndex();
directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA);
- directLengthOutput =
- new RunLengthIntegerWriter(writer.createStream
- (id, OrcProto.Stream.Kind.LENGTH), false);
+ directLengthOutput = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
dictionaryKeySizeThreshold = writer.getConfiguration().getFloat(
HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.varname,
HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.
@@ -906,24 +947,23 @@ class WriterImpl implements Writer, Memo
rowIndexValueCount.add(0L);
}
- // Calls getPosition on the row output stream if dictionary encoding is used, and the direct
- // output stream if direct encoding is used
- private void recordOutputPosition(OrcProto.RowIndexEntry.Builder base) throws IOException {
- if (useDictionaryEncoding) {
- rowOutput.getPosition(new RowIndexPositionRecorder(base));
- } else {
- directStreamOutput.getPosition(new RowIndexPositionRecorder(base));
- }
- }
-
@Override
OrcProto.ColumnEncoding getEncoding() {
// Returns the encoding used for the last call to writeStripe
if (useDictionaryEncoding) {
+ if(isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder().setKind(
+ OrcProto.ColumnEncoding.Kind.DICTIONARY_V2).
+ setDictionarySize(dictionary.size()).build();
+ }
return OrcProto.ColumnEncoding.newBuilder().setKind(
OrcProto.ColumnEncoding.Kind.DICTIONARY).
setDictionarySize(dictionary.size()).build();
} else {
+ if(isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder().setKind(
+ OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
return OrcProto.ColumnEncoding.newBuilder().setKind(
OrcProto.ColumnEncoding.Kind.DIRECT).build();
}
@@ -956,7 +996,8 @@ class WriterImpl implements Writer, Memo
private static class BinaryTreeWriter extends TreeWriter {
private final PositionedOutputStream stream;
- private final RunLengthIntegerWriter length;
+ private final IntegerWriter length;
+ private boolean isDirectV2 = true;
BinaryTreeWriter(int columnId,
ObjectInspector inspector,
@@ -965,12 +1006,23 @@ class WriterImpl implements Writer, Memo
super(columnId, inspector, writer, nullable);
this.stream = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
- this.length = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.LENGTH), false);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ this.length = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
recordPosition(rowIndexPosition);
}
@Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
void write(Object obj) throws IOException {
super.write(obj);
if (obj != null) {
@@ -1003,22 +1055,34 @@ class WriterImpl implements Writer, Memo
Timestamp.valueOf("2015-01-01 00:00:00").getTime() / MILLIS_PER_SECOND;
private static class TimestampTreeWriter extends TreeWriter {
- private final RunLengthIntegerWriter seconds;
- private final RunLengthIntegerWriter nanos;
+ private final IntegerWriter seconds;
+ private final IntegerWriter nanos;
+ private final boolean isDirectV2;
TimestampTreeWriter(int columnId,
ObjectInspector inspector,
StreamFactory writer,
boolean nullable) throws IOException {
super(columnId, inspector, writer, nullable);
- this.seconds = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.DATA), true);
- this.nanos = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.SECONDARY), false);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ this.seconds = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), true, isDirectV2);
+ this.nanos = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), false, isDirectV2);
recordPosition(rowIndexPosition);
}
@Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
void write(Object obj) throws IOException {
super.write(obj);
if (obj != null) {
@@ -1064,7 +1128,8 @@ class WriterImpl implements Writer, Memo
}
private static class DateTreeWriter extends TreeWriter {
- private final RunLengthIntegerWriter writer;
+ private final IntegerWriter writer;
+ private final boolean isDirectV2;
DateTreeWriter(int columnId,
ObjectInspector inspector,
@@ -1073,7 +1138,8 @@ class WriterImpl implements Writer, Memo
super(columnId, inspector, writer, nullable);
PositionedOutputStream out = writer.createStream(id,
OrcProto.Stream.Kind.DATA);
- this.writer = new RunLengthIntegerWriter(out, true);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ this.writer = createIntegerWriter(out, true, isDirectV2);
recordPosition(rowIndexPosition);
}
@@ -1101,24 +1167,46 @@ class WriterImpl implements Writer, Memo
super.recordPosition(recorder);
writer.getPosition(recorder);
}
+
+ @Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
}
private static class DecimalTreeWriter extends TreeWriter {
private final PositionedOutputStream valueStream;
- private final RunLengthIntegerWriter scaleStream;
+ private final IntegerWriter scaleStream;
+ private final boolean isDirectV2;
DecimalTreeWriter(int columnId,
ObjectInspector inspector,
StreamFactory writer,
boolean nullable) throws IOException {
super(columnId, inspector, writer, nullable);
+ this.isDirectV2 = isNewWriteFormat(writer);
valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
- scaleStream = new RunLengthIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.SECONDARY), true);
+ this.scaleStream = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), true, isDirectV2);
recordPosition(rowIndexPosition);
}
@Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
void write(Object obj) throws IOException {
super.write(obj);
if (obj != null) {
@@ -1191,25 +1279,36 @@ class WriterImpl implements Writer, Memo
}
private static class ListTreeWriter extends TreeWriter {
- private final RunLengthIntegerWriter lengths;
+ private final IntegerWriter lengths;
+ private final boolean isDirectV2;
ListTreeWriter(int columnId,
ObjectInspector inspector,
StreamFactory writer,
boolean nullable) throws IOException {
super(columnId, inspector, writer, nullable);
+ this.isDirectV2 = isNewWriteFormat(writer);
ListObjectInspector listObjectInspector = (ListObjectInspector) inspector;
childrenWriters = new TreeWriter[1];
childrenWriters[0] =
createTreeWriter(listObjectInspector.getListElementObjectInspector(),
writer, true);
- lengths =
- new RunLengthIntegerWriter(writer.createStream(columnId,
- OrcProto.Stream.Kind.LENGTH), false);
+ lengths = createIntegerWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
recordPosition(rowIndexPosition);
}
@Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
void write(Object obj) throws IOException {
super.write(obj);
if (obj != null) {
@@ -1241,26 +1340,37 @@ class WriterImpl implements Writer, Memo
}
private static class MapTreeWriter extends TreeWriter {
- private final RunLengthIntegerWriter lengths;
+ private final IntegerWriter lengths;
+ private final boolean isDirectV2;
MapTreeWriter(int columnId,
ObjectInspector inspector,
StreamFactory writer,
boolean nullable) throws IOException {
super(columnId, inspector, writer, nullable);
+ this.isDirectV2 = isNewWriteFormat(writer);
MapObjectInspector insp = (MapObjectInspector) inspector;
childrenWriters = new TreeWriter[2];
childrenWriters[0] =
createTreeWriter(insp.getMapKeyObjectInspector(), writer, true);
childrenWriters[1] =
createTreeWriter(insp.getMapValueObjectInspector(), writer, true);
- lengths =
- new RunLengthIntegerWriter(writer.createStream(columnId,
- OrcProto.Stream.Kind.LENGTH), false);
+ lengths = createIntegerWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2);
recordPosition(rowIndexPosition);
}
@Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
void write(Object obj) throws IOException {
super.write(obj);
if (obj != null) {
@@ -1346,8 +1456,7 @@ class WriterImpl implements Writer, Memo
private static TreeWriter createTreeWriter(ObjectInspector inspector,
StreamFactory streamFactory,
- boolean nullable
- ) throws IOException {
+ boolean nullable) throws IOException {
switch (inspector.getCategory()) {
case PRIMITIVE:
switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) {
Modified: hive/trunk/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto?rev=1513155&r1=1513154&r2=1513155&view=diff
==============================================================================
--- hive/trunk/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto (original)
+++ hive/trunk/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto Mon Aug 12 15:03:30 2013
@@ -73,6 +73,8 @@ message ColumnEncoding {
enum Kind {
DIRECT = 0;
DICTIONARY = 1;
+ DIRECT_V2 = 2;
+ DICTIONARY_V2 = 3;
}
required Kind kind = 1;
optional uint32 dictionarySize = 2;
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java?rev=1513155&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java Mon Aug 12 15:03:30 2013
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Random;
+
+import org.junit.Test;
+
+import com.google.common.primitives.Longs;
+
+public class TestBitPack {
+
+ private static final int SIZE = 100;
+ private static Random rand = new Random(100);
+
+ private long[] deltaEncode(long[] inp) {
+ long[] output = new long[inp.length];
+ for(int i = 0; i < inp.length; i++) {
+ output[i] = SerializationUtils.zigzagEncode(inp[i]);
+ }
+ return output;
+ }
+
+ private long nextLong(Random rng, long n) {
+ long bits, val;
+ do {
+ bits = (rng.nextLong() << 1) >>> 1;
+ val = bits % n;
+ } while (bits - val + (n - 1) < 0L);
+ return val;
+ }
+
+ private void runTest(int numBits) throws IOException {
+ long[] inp = new long[SIZE];
+ for(int i = 0; i < SIZE; i++) {
+ long val = 0;
+ if (numBits <= 32) {
+ if (numBits == 1) {
+ val = -1 * rand.nextInt(2);
+ } else {
+ val = rand.nextInt((int) Math.pow(2, numBits - 1));
+ }
+ } else {
+ val = nextLong(rand, (long) Math.pow(2, numBits - 2));
+ }
+ if (val % 2 == 0) {
+ val = -val;
+ }
+ inp[i] = val;
+ }
+ long[] deltaEncoded = deltaEncode(inp);
+ long minInput = Collections.min(Longs.asList(deltaEncoded));
+ long maxInput = Collections.max(Longs.asList(deltaEncoded));
+ long rangeInput = maxInput - minInput;
+ int fixedWidth = SerializationUtils.findClosestNumBits(rangeInput);
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ OutStream output = new OutStream("test", SIZE, null, collect);
+ SerializationUtils.writeInts(deltaEncoded, 0, deltaEncoded.length,
+ fixedWidth, output);
+ output.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ long[] buff = new long[SIZE];
+ SerializationUtils.readInts(buff, 0, SIZE, fixedWidth,
+ InStream.create("test", inBuf, null, SIZE));
+ for(int i = 0; i < SIZE; i++) {
+ buff[i] = SerializationUtils.zigzagDecode(buff[i]);
+ }
+ assertEquals(numBits, fixedWidth);
+ assertArrayEquals(inp, buff);
+ }
+
+ @Test
+ public void test01BitPacking1Bit() throws IOException {
+ runTest(1);
+ }
+
+ @Test
+ public void test02BitPacking2Bit() throws IOException {
+ runTest(2);
+ }
+
+ @Test
+ public void test03BitPacking3Bit() throws IOException {
+ runTest(3);
+ }
+
+ @Test
+ public void test04BitPacking4Bit() throws IOException {
+ runTest(4);
+ }
+
+ @Test
+ public void test05BitPacking5Bit() throws IOException {
+ runTest(5);
+ }
+
+ @Test
+ public void test06BitPacking6Bit() throws IOException {
+ runTest(6);
+ }
+
+ @Test
+ public void test07BitPacking7Bit() throws IOException {
+ runTest(7);
+ }
+
+ @Test
+ public void test08BitPacking8Bit() throws IOException {
+ runTest(8);
+ }
+
+ @Test
+ public void test09BitPacking9Bit() throws IOException {
+ runTest(9);
+ }
+
+ @Test
+ public void test10BitPacking10Bit() throws IOException {
+ runTest(10);
+ }
+
+ @Test
+ public void test11BitPacking11Bit() throws IOException {
+ runTest(11);
+ }
+
+ @Test
+ public void test12BitPacking12Bit() throws IOException {
+ runTest(12);
+ }
+
+ @Test
+ public void test13BitPacking13Bit() throws IOException {
+ runTest(13);
+ }
+
+ @Test
+ public void test14BitPacking14Bit() throws IOException {
+ runTest(14);
+ }
+
+ @Test
+ public void test15BitPacking15Bit() throws IOException {
+ runTest(15);
+ }
+
+ @Test
+ public void test16BitPacking16Bit() throws IOException {
+ runTest(16);
+ }
+
+ @Test
+ public void test17BitPacking17Bit() throws IOException {
+ runTest(17);
+ }
+
+ @Test
+ public void test18BitPacking18Bit() throws IOException {
+ runTest(18);
+ }
+
+ @Test
+ public void test19BitPacking19Bit() throws IOException {
+ runTest(19);
+ }
+
+ @Test
+ public void test20BitPacking20Bit() throws IOException {
+ runTest(20);
+ }
+
+ @Test
+ public void test21BitPacking21Bit() throws IOException {
+ runTest(21);
+ }
+
+ @Test
+ public void test22BitPacking22Bit() throws IOException {
+ runTest(22);
+ }
+
+ @Test
+ public void test23BitPacking23Bit() throws IOException {
+ runTest(23);
+ }
+
+ @Test
+ public void test24BitPacking24Bit() throws IOException {
+ runTest(24);
+ }
+
+ @Test
+ public void test26BitPacking26Bit() throws IOException {
+ runTest(26);
+ }
+
+ @Test
+ public void test28BitPacking28Bit() throws IOException {
+ runTest(28);
+ }
+
+ @Test
+ public void test30BitPacking30Bit() throws IOException {
+ runTest(30);
+ }
+
+ @Test
+ public void test32BitPacking32Bit() throws IOException {
+ runTest(32);
+ }
+
+ @Test
+ public void test40BitPacking40Bit() throws IOException {
+ runTest(40);
+ }
+
+ @Test
+ public void test48BitPacking48Bit() throws IOException {
+ runTest(48);
+ }
+
+ @Test
+ public void test56BitPacking56Bit() throws IOException {
+ runTest(56);
+ }
+
+ @Test
+ public void test64BitPacking64Bit() throws IOException {
+ runTest(64);
+ }
+}
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java?rev=1513155&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java Mon Aug 12 15:03:30 2013
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.junit.Test;
+
+public class TestIntegerCompressionReader {
+
+ public void runSeekTest(CompressionCodec codec) throws Exception {
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ RunLengthIntegerWriterV2 out = new RunLengthIntegerWriterV2(
+ new OutStream("test", 1000, codec, collect), true);
+ TestInStream.PositionCollector[] positions =
+ new TestInStream.PositionCollector[4096];
+ Random random = new Random(99);
+ int[] junk = new int[2048];
+ for(int i=0; i < junk.length; ++i) {
+ junk[i] = random.nextInt();
+ }
+ for(int i=0; i < 4096; ++i) {
+ positions[i] = new TestInStream.PositionCollector();
+ out.getPosition(positions[i]);
+ // test runs, incrementing runs, non-runs
+ if (i < 1024) {
+ out.write(i/4);
+ } else if (i < 2048) {
+ out.write(2*i);
+ } else {
+ out.write(junk[i-2048]);
+ }
+ }
+ out.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ RunLengthIntegerReaderV2 in = new RunLengthIntegerReaderV2(InStream.create
+ ("test", inBuf, codec, 1000), true);
+ for(int i=0; i < 2048; ++i) {
+ int x = (int) in.next();
+ if (i < 1024) {
+ assertEquals(i/4, x);
+ } else if (i < 2048) {
+ assertEquals(2*i, x);
+ } else {
+ assertEquals(junk[i-2048], x);
+ }
+ }
+ for(int i=2047; i >= 0; --i) {
+ in.seek(positions[i]);
+ int x = (int) in.next();
+ if (i < 1024) {
+ assertEquals(i/4, x);
+ } else if (i < 2048) {
+ assertEquals(2*i, x);
+ } else {
+ assertEquals(junk[i-2048], x);
+ }
+ }
+ }
+
+ @Test
+ public void testUncompressedSeek() throws Exception {
+ runSeekTest(null);
+ }
+
+ @Test
+ public void testCompressedSeek() throws Exception {
+ runSeekTest(new ZlibCodec());
+ }
+
+ @Test
+ public void testSkips() throws Exception {
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ RunLengthIntegerWriterV2 out = new RunLengthIntegerWriterV2(
+ new OutStream("test", 100, null, collect), true);
+ for(int i=0; i < 2048; ++i) {
+ if (i < 1024) {
+ out.write(i);
+ } else {
+ out.write(256 * i);
+ }
+ }
+ out.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ RunLengthIntegerReaderV2 in = new RunLengthIntegerReaderV2(InStream.create
+ ("test", inBuf, null, 100), true);
+ for(int i=0; i < 2048; i += 10) {
+ int x = (int) in.next();
+ if (i < 1024) {
+ assertEquals(i, x);
+ } else {
+ assertEquals(256 * i, x);
+ }
+ if (i < 2038) {
+ in.skip(9);
+ }
+ in.skip(0);
+ }
+ }
+}