You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/12/12 00:28:08 UTC
[11/16] hive git commit: HIVE-11890. Create ORC submodue. (omalley
reviewed by prasanthj)
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
new file mode 100644
index 0000000..5f2a673
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+
+/**
+ * A reader that reads a sequence of light weight compressed integers. Refer
+ * {@link RunLengthIntegerWriterV2} for description of various lightweight
+ * compression techniques.
+ */
+public class RunLengthIntegerReaderV2 implements IntegerReader {
+ public static final Logger LOG = LoggerFactory.getLogger(RunLengthIntegerReaderV2.class);
+
+ private InStream input;
+ private final boolean signed;
+ private final long[] literals = new long[RunLengthIntegerWriterV2.MAX_SCOPE];
+ private boolean isRepeating = false;
+ private int numLiterals = 0;
+ private int used = 0;
+ private final boolean skipCorrupt;
+ private final SerializationUtils utils;
+ private RunLengthIntegerWriterV2.EncodingType currentEncoding;
+
+ public RunLengthIntegerReaderV2(InStream input, boolean signed,
+ boolean skipCorrupt) throws IOException {
+ this.input = input;
+ this.signed = signed;
+ this.skipCorrupt = skipCorrupt;
+ this.utils = new SerializationUtils();
+ }
+
+ private final static RunLengthIntegerWriterV2.EncodingType[] encodings = RunLengthIntegerWriterV2.EncodingType.values();
+ private void readValues(boolean ignoreEof) throws IOException {
+ // read the first 2 bits and determine the encoding type
+ isRepeating = false;
+ int firstByte = input.read();
+ if (firstByte < 0) {
+ if (!ignoreEof) {
+ throw new EOFException("Read past end of RLE integer from " + input);
+ }
+ used = numLiterals = 0;
+ return;
+ }
+ currentEncoding = encodings[(firstByte >>> 6) & 0x03];
+ switch (currentEncoding) {
+ case SHORT_REPEAT: readShortRepeatValues(firstByte); break;
+ case DIRECT: readDirectValues(firstByte); break;
+ case PATCHED_BASE: readPatchedBaseValues(firstByte); break;
+ case DELTA: readDeltaValues(firstByte); break;
+ default: throw new IOException("Unknown encoding " + currentEncoding);
+ }
+ }
+
+ private void readDeltaValues(int firstByte) throws IOException {
+
+ // extract the number of fixed bits
+ int fb = (firstByte >>> 1) & 0x1f;
+ if (fb != 0) {
+ fb = utils.decodeBitWidth(fb);
+ }
+
+ // extract the blob run length
+ int len = (firstByte & 0x01) << 8;
+ len |= input.read();
+
+ // read the first value stored as vint
+ long firstVal = 0;
+ if (signed) {
+ firstVal = utils.readVslong(input);
+ } else {
+ firstVal = utils.readVulong(input);
+ }
+
+ // store first value to result buffer
+ long prevVal = firstVal;
+ literals[numLiterals++] = firstVal;
+
+ // if fixed bits is 0 then all values have fixed delta
+ if (fb == 0) {
+ // read the fixed delta value stored as vint (deltas can be negative even
+ // if all number are positive)
+ long fd = utils.readVslong(input);
+ if (fd == 0) {
+ isRepeating = true;
+ assert numLiterals == 1;
+ Arrays.fill(literals, numLiterals, numLiterals + len, literals[0]);
+ numLiterals += len;
+ } else {
+ // add fixed deltas to adjacent values
+ for(int i = 0; i < len; i++) {
+ literals[numLiterals++] = literals[numLiterals - 2] + fd;
+ }
+ }
+ } else {
+ long deltaBase = utils.readVslong(input);
+ // add delta base and first value
+ literals[numLiterals++] = firstVal + deltaBase;
+ prevVal = literals[numLiterals - 1];
+ len -= 1;
+
+ // write the unpacked values, add it to previous value and store final
+ // value to result buffer. if the delta base value is negative then it
+ // is a decreasing sequence else an increasing sequence
+ utils.readInts(literals, numLiterals, len, fb, input);
+ while (len > 0) {
+ if (deltaBase < 0) {
+ literals[numLiterals] = prevVal - literals[numLiterals];
+ } else {
+ literals[numLiterals] = prevVal + literals[numLiterals];
+ }
+ prevVal = literals[numLiterals];
+ len--;
+ numLiterals++;
+ }
+ }
+ }
+
+ private void readPatchedBaseValues(int firstByte) throws IOException {
+
+ // extract the number of fixed bits
+ int fbo = (firstByte >>> 1) & 0x1f;
+ int fb = utils.decodeBitWidth(fbo);
+
+ // extract the run length of data blob
+ int len = (firstByte & 0x01) << 8;
+ len |= input.read();
+ // runs are always one off
+ len += 1;
+
+ // extract the number of bytes occupied by base
+ int thirdByte = input.read();
+ int bw = (thirdByte >>> 5) & 0x07;
+ // base width is one off
+ bw += 1;
+
+ // extract patch width
+ int pwo = thirdByte & 0x1f;
+ int pw = utils.decodeBitWidth(pwo);
+
+ // read fourth byte and extract patch gap width
+ int fourthByte = input.read();
+ int pgw = (fourthByte >>> 5) & 0x07;
+ // patch gap width is one off
+ pgw += 1;
+
+ // extract the length of the patch list
+ int pl = fourthByte & 0x1f;
+
+ // read the next base width number of bytes to extract base value
+ long base = utils.bytesToLongBE(input, bw);
+ long mask = (1L << ((bw * 8) - 1));
+ // if MSB of base value is 1 then base is negative value else positive
+ if ((base & mask) != 0) {
+ base = base & ~mask;
+ base = -base;
+ }
+
+ // unpack the data blob
+ long[] unpacked = new long[len];
+ utils.readInts(unpacked, 0, len, fb, input);
+
+ // unpack the patch blob
+ long[] unpackedPatch = new long[pl];
+
+ if ((pw + pgw) > 64 && !skipCorrupt) {
+ throw new IOException("Corruption in ORC data encountered. To skip" +
+ " reading corrupted data, set hive.exec.orc.skip.corrupt.data to" +
+ " true");
+ }
+ int bitSize = utils.getClosestFixedBits(pw + pgw);
+ utils.readInts(unpackedPatch, 0, pl, bitSize, input);
+
+ // apply the patch directly when decoding the packed data
+ int patchIdx = 0;
+ long currGap = 0;
+ long currPatch = 0;
+ long patchMask = ((1L << pw) - 1);
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & patchMask;
+ long actualGap = 0;
+
+ // special case: gap is >255 then patch value will be 0.
+ // if gap is <=255 then patch value cannot be 0
+ while (currGap == 255 && currPatch == 0) {
+ actualGap += 255;
+ patchIdx++;
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & patchMask;
+ }
+ // add the left over gap
+ actualGap += currGap;
+
+ // unpack data blob, patch it (if required), add base to get final result
+ for(int i = 0; i < unpacked.length; i++) {
+ if (i == actualGap) {
+ // extract the patch value
+ long patchedVal = unpacked[i] | (currPatch << fb);
+
+ // add base to patched value
+ literals[numLiterals++] = base + patchedVal;
+
+ // increment the patch to point to next entry in patch list
+ patchIdx++;
+
+ if (patchIdx < pl) {
+ // read the next gap and patch
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & patchMask;
+ actualGap = 0;
+
+ // special case: gap is >255 then patch will be 0. if gap is
+ // <=255 then patch cannot be 0
+ while (currGap == 255 && currPatch == 0) {
+ actualGap += 255;
+ patchIdx++;
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & patchMask;
+ }
+ // add the left over gap
+ actualGap += currGap;
+
+ // next gap is relative to the current gap
+ actualGap += i;
+ }
+ } else {
+ // no patching required. add base to unpacked value to get final value
+ literals[numLiterals++] = base + unpacked[i];
+ }
+ }
+
+ }
+
+ private void readDirectValues(int firstByte) throws IOException {
+
+ // extract the number of fixed bits
+ int fbo = (firstByte >>> 1) & 0x1f;
+ int fb = utils.decodeBitWidth(fbo);
+
+ // extract the run length
+ int len = (firstByte & 0x01) << 8;
+ len |= input.read();
+ // runs are one off
+ len += 1;
+
+ // write the unpacked values and zigzag decode to result buffer
+ utils.readInts(literals, numLiterals, len, fb, input);
+ if (signed) {
+ for(int i = 0; i < len; i++) {
+ literals[numLiterals] = utils.zigzagDecode(literals[numLiterals]);
+ numLiterals++;
+ }
+ } else {
+ numLiterals += len;
+ }
+ }
+
+ private void readShortRepeatValues(int firstByte) throws IOException {
+
+ // read the number of bytes occupied by the value
+ int size = (firstByte >>> 3) & 0x07;
+ // #bytes are one off
+ size += 1;
+
+ // read the run length
+ int len = firstByte & 0x07;
+ // run lengths values are stored only after MIN_REPEAT value is met
+ len += RunLengthIntegerWriterV2.MIN_REPEAT;
+
+ // read the repeated value which is store using fixed bytes
+ long val = utils.bytesToLongBE(input, size);
+
+ if (signed) {
+ val = utils.zigzagDecode(val);
+ }
+
+ if (numLiterals != 0) {
+ // Currently this always holds, which makes peekNextAvailLength simpler.
+ // If this changes, peekNextAvailLength should be adjusted accordingly.
+ throw new AssertionError("readValues called with existing values present");
+ }
+ // repeat the value for length times
+ isRepeating = true;
+ // TODO: this is not so useful and V1 reader doesn't do that. Fix? Same if delta == 0
+ for(int i = 0; i < len; i++) {
+ literals[i] = val;
+ }
+ numLiterals = len;
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return used != numLiterals || input.available() > 0;
+ }
+
+ @Override
+ public long next() throws IOException {
+ long result;
+ if (used == numLiterals) {
+ numLiterals = 0;
+ used = 0;
+ readValues(false);
+ }
+ result = literals[used++];
+ return result;
+ }
+
+ @Override
+ public void seek(PositionProvider index) throws IOException {
+ input.seek(index);
+ int consumed = (int) index.getNext();
+ if (consumed != 0) {
+ // a loop is required for cases where we break the run into two
+ // parts
+ while (consumed > 0) {
+ numLiterals = 0;
+ readValues(false);
+ used = consumed;
+ consumed -= numLiterals;
+ }
+ } else {
+ used = 0;
+ numLiterals = 0;
+ }
+ }
+
+ @Override
+ public void skip(long numValues) throws IOException {
+ while (numValues > 0) {
+ if (used == numLiterals) {
+ numLiterals = 0;
+ used = 0;
+ readValues(false);
+ }
+ long consume = Math.min(numValues, numLiterals - used);
+ used += consume;
+ numValues -= consume;
+ }
+ }
+
+ @Override
+ public void nextVector(LongColumnVector previous, long previousLen) throws IOException {
+ previous.isRepeating = true;
+ for (int i = 0; i < previousLen; i++) {
+ if (!previous.isNull[i]) {
+ previous.vector[i] = next();
+ } else {
+ // The default value of null for int type in vectorized
+ // processing is 1, so set that if the value is null
+ previous.vector[i] = 1;
+ }
+
+ // The default value for nulls in Vectorization for int types is 1
+ // and given that non null value can also be 1, we need to check for isNull also
+ // when determining the isRepeating flag.
+ if (previous.isRepeating
+ && i > 0
+ && (previous.vector[i - 1] != previous.vector[i] ||
+ previous.isNull[i - 1] != previous.isNull[i])) {
+ previous.isRepeating = false;
+ }
+ }
+ }
+
+ @Override
+ public void setInStream(InStream data) {
+ input = data;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java
new file mode 100644
index 0000000..3e5f2e2
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+
+/**
+ * A streamFactory that writes a sequence of integers. A control byte is written before
+ * each run with positive values 0 to 127 meaning 3 to 130 repetitions, each
+ * repetition is offset by a delta. If the control byte is -1 to -128, 1 to 128
+ * literal vint values follow.
+ */
+public class RunLengthIntegerWriter implements IntegerWriter {
+ static final int MIN_REPEAT_SIZE = 3;
+ static final int MAX_DELTA = 127;
+ static final int MIN_DELTA = -128;
+ static final int MAX_LITERAL_SIZE = 128;
+ private static final int MAX_REPEAT_SIZE = 127 + MIN_REPEAT_SIZE;
+ private final PositionedOutputStream output;
+ private final boolean signed;
+ private final long[] literals = new long[MAX_LITERAL_SIZE];
+ private int numLiterals = 0;
+ private long delta = 0;
+ private boolean repeat = false;
+ private int tailRunLength = 0;
+ private SerializationUtils utils;
+
+ public RunLengthIntegerWriter(PositionedOutputStream output,
+ boolean signed) {
+ this.output = output;
+ this.signed = signed;
+ this.utils = new SerializationUtils();
+ }
+
+ private void writeValues() throws IOException {
+ if (numLiterals != 0) {
+ if (repeat) {
+ output.write(numLiterals - MIN_REPEAT_SIZE);
+ output.write((byte) delta);
+ if (signed) {
+ utils.writeVslong(output, literals[0]);
+ } else {
+ utils.writeVulong(output, literals[0]);
+ }
+ } else {
+ output.write(-numLiterals);
+ for(int i=0; i < numLiterals; ++i) {
+ if (signed) {
+ utils.writeVslong(output, literals[i]);
+ } else {
+ utils.writeVulong(output, literals[i]);
+ }
+ }
+ }
+ repeat = false;
+ numLiterals = 0;
+ tailRunLength = 0;
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ writeValues();
+ output.flush();
+ }
+
+ @Override
+ public void write(long value) throws IOException {
+ if (numLiterals == 0) {
+ literals[numLiterals++] = value;
+ tailRunLength = 1;
+ } else if (repeat) {
+ if (value == literals[0] + delta * numLiterals) {
+ numLiterals += 1;
+ if (numLiterals == MAX_REPEAT_SIZE) {
+ writeValues();
+ }
+ } else {
+ writeValues();
+ literals[numLiterals++] = value;
+ tailRunLength = 1;
+ }
+ } else {
+ if (tailRunLength == 1) {
+ delta = value - literals[numLiterals - 1];
+ if (delta < MIN_DELTA || delta > MAX_DELTA) {
+ tailRunLength = 1;
+ } else {
+ tailRunLength = 2;
+ }
+ } else if (value == literals[numLiterals - 1] + delta) {
+ tailRunLength += 1;
+ } else {
+ delta = value - literals[numLiterals - 1];
+ if (delta < MIN_DELTA || delta > MAX_DELTA) {
+ tailRunLength = 1;
+ } else {
+ tailRunLength = 2;
+ }
+ }
+ if (tailRunLength == MIN_REPEAT_SIZE) {
+ if (numLiterals + 1 == MIN_REPEAT_SIZE) {
+ repeat = true;
+ numLiterals += 1;
+ } else {
+ numLiterals -= MIN_REPEAT_SIZE - 1;
+ long base = literals[numLiterals];
+ writeValues();
+ literals[0] = base;
+ repeat = true;
+ numLiterals = MIN_REPEAT_SIZE;
+ }
+ } else {
+ literals[numLiterals++] = value;
+ if (numLiterals == MAX_LITERAL_SIZE) {
+ writeValues();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void getPosition(PositionRecorder recorder) throws IOException {
+ output.getPosition(recorder);
+ recorder.addPosition(numLiterals);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java
new file mode 100644
index 0000000..fab2801
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java
@@ -0,0 +1,831 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+
+/**
+ * A writer that performs light weight compression over sequence of integers.
+ * <p>
+ * There are four types of lightweight integer compression
+ * <ul>
+ * <li>SHORT_REPEAT</li>
+ * <li>DIRECT</li>
+ * <li>PATCHED_BASE</li>
+ * <li>DELTA</li>
+ * </ul>
+ * </p>
+ * The description and format for these types are as below:
+ * <p>
+ * <b>SHORT_REPEAT:</b> Used for short repeated integer sequences.
+ * <ul>
+ * <li>1 byte header
+ * <ul>
+ * <li>2 bits for encoding type</li>
+ * <li>3 bits for bytes required for repeating value</li>
+ * <li>3 bits for repeat count (MIN_REPEAT + run length)</li>
+ * </ul>
+ * </li>
+ * <li>Blob - repeat value (fixed bytes)</li>
+ * </ul>
+ * </p>
+ * <p>
+ * <b>DIRECT:</b> Used for random integer sequences whose number of bit
+ * requirement doesn't vary a lot.
+ * <ul>
+ * <li>2 bytes header
+ * <ul>
+ * 1st byte
+ * <li>2 bits for encoding type</li>
+ * <li>5 bits for fixed bit width of values in blob</li>
+ * <li>1 bit for storing MSB of run length</li>
+ * </ul>
+ * <ul>
+ * 2nd byte
+ * <li>8 bits for lower run length bits</li>
+ * </ul>
+ * </li>
+ * <li>Blob - stores the direct values using fixed bit width. The length of the
+ * data blob is (fixed width * run length) bits long</li>
+ * </ul>
+ * </p>
+ * <p>
+ * <b>PATCHED_BASE:</b> Used for random integer sequences whose number of bit
+ * requirement varies beyond a threshold.
+ * <ul>
+ * <li>4 bytes header
+ * <ul>
+ * 1st byte
+ * <li>2 bits for encoding type</li>
+ * <li>5 bits for fixed bit width of values in blob</li>
+ * <li>1 bit for storing MSB of run length</li>
+ * </ul>
+ * <ul>
+ * 2nd byte
+ * <li>8 bits for lower run length bits</li>
+ * </ul>
+ * <ul>
+ * 3rd byte
+ * <li>3 bits for bytes required to encode base value</li>
+ * <li>5 bits for patch width</li>
+ * </ul>
+ * <ul>
+ * 4th byte
+ * <li>3 bits for patch gap width</li>
+ * <li>5 bits for patch length</li>
+ * </ul>
+ * </li>
+ * <li>Base value - Stored using fixed number of bytes. If MSB is set, base
+ * value is negative else positive. Length of base value is (base width * 8)
+ * bits.</li>
+ * <li>Data blob - Base reduced values as stored using fixed bit width. Length
+ * of data blob is (fixed width * run length) bits.</li>
+ * <li>Patch blob - Patch blob is a list of gap and patch value. Each entry in
+ * the patch list is (patch width + patch gap width) bits long. Gap between the
+ * subsequent elements to be patched are stored in upper part of entry whereas
+ * patch values are stored in lower part of entry. Length of patch blob is
+ * ((patch width + patch gap width) * patch length) bits.</li>
+ * </ul>
+ * </p>
+ * <p>
+ * <b>DELTA</b> Used for monotonically increasing or decreasing sequences,
+ * sequences with fixed delta values or long repeated sequences.
+ * <ul>
+ * <li>2 bytes header
+ * <ul>
+ * 1st byte
+ * <li>2 bits for encoding type</li>
+ * <li>5 bits for fixed bit width of values in blob</li>
+ * <li>1 bit for storing MSB of run length</li>
+ * </ul>
+ * <ul>
+ * 2nd byte
+ * <li>8 bits for lower run length bits</li>
+ * </ul>
+ * </li>
+ * <li>Base value - zigzag encoded value written as varint</li>
+ * <li>Delta base - zigzag encoded value written as varint</li>
+ * <li>Delta blob - only positive values. monotonicity and orderness are decided
+ * based on the sign of the base value and delta base</li>
+ * </ul>
+ * </p>
+ */
+public class RunLengthIntegerWriterV2 implements IntegerWriter {
+
+ public enum EncodingType {
+ SHORT_REPEAT, DIRECT, PATCHED_BASE, DELTA
+ }
+
+ static final int MAX_SCOPE = 512;
+ static final int MIN_REPEAT = 3;
+ private static final int MAX_SHORT_REPEAT_LENGTH = 10;
+ private long prevDelta = 0;
+ private int fixedRunLength = 0;
+ private int variableRunLength = 0;
+ private final long[] literals = new long[MAX_SCOPE];
+ private final PositionedOutputStream output;
+ private final boolean signed;
+ private EncodingType encoding;
+ private int numLiterals;
+ private final long[] zigzagLiterals = new long[MAX_SCOPE];
+ private final long[] baseRedLiterals = new long[MAX_SCOPE];
+ private final long[] adjDeltas = new long[MAX_SCOPE];
+ private long fixedDelta;
+ private int zzBits90p;
+ private int zzBits100p;
+ private int brBits95p;
+ private int brBits100p;
+ private int bitsDeltaMax;
+ private int patchWidth;
+ private int patchGapWidth;
+ private int patchLength;
+ private long[] gapVsPatchList;
+ private long min;
+ private boolean isFixedDelta;
+ private SerializationUtils utils;
+ private boolean alignedBitpacking;
+
+ RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed) {
+ this(output, signed, true);
+ }
+
+ public RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed,
+ boolean alignedBitpacking) {
+ this.output = output;
+ this.signed = signed;
+ this.alignedBitpacking = alignedBitpacking;
+ this.utils = new SerializationUtils();
+ clear();
+ }
+
+ private void writeValues() throws IOException {
+ if (numLiterals != 0) {
+
+ if (encoding.equals(EncodingType.SHORT_REPEAT)) {
+ writeShortRepeatValues();
+ } else if (encoding.equals(EncodingType.DIRECT)) {
+ writeDirectValues();
+ } else if (encoding.equals(EncodingType.PATCHED_BASE)) {
+ writePatchedBaseValues();
+ } else {
+ writeDeltaValues();
+ }
+
+ // clear all the variables
+ clear();
+ }
+ }
+
+ private void writeDeltaValues() throws IOException {
+ int len = 0;
+ int fb = bitsDeltaMax;
+ int efb = 0;
+
+ if (alignedBitpacking) {
+ fb = utils.getClosestAlignedFixedBits(fb);
+ }
+
+ if (isFixedDelta) {
+ // if fixed run length is greater than threshold then it will be fixed
+ // delta sequence with delta value 0 else fixed delta sequence with
+ // non-zero delta value
+ if (fixedRunLength > MIN_REPEAT) {
+ // ex. sequence: 2 2 2 2 2 2 2 2
+ len = fixedRunLength - 1;
+ fixedRunLength = 0;
+ } else {
+ // ex. sequence: 4 6 8 10 12 14 16
+ len = variableRunLength - 1;
+ variableRunLength = 0;
+ }
+ } else {
+ // fixed width 0 is used for long repeating values.
+ // sequences that require only 1 bit to encode will have an additional bit
+ if (fb == 1) {
+ fb = 2;
+ }
+ efb = utils.encodeBitWidth(fb);
+ efb = efb << 1;
+ len = variableRunLength - 1;
+ variableRunLength = 0;
+ }
+
+ // extract the 9th bit of run length
+ final int tailBits = (len & 0x100) >>> 8;
+
+ // create first byte of the header
+ final int headerFirstByte = getOpcode() | efb | tailBits;
+
+ // second byte of the header stores the remaining 8 bits of runlength
+ final int headerSecondByte = len & 0xff;
+
+ // write header
+ output.write(headerFirstByte);
+ output.write(headerSecondByte);
+
+ // store the first value from zigzag literal array
+ if (signed) {
+ utils.writeVslong(output, literals[0]);
+ } else {
+ utils.writeVulong(output, literals[0]);
+ }
+
+ if (isFixedDelta) {
+ // if delta is fixed then we don't need to store delta blob
+ utils.writeVslong(output, fixedDelta);
+ } else {
+ // store the first value as delta value using zigzag encoding
+ utils.writeVslong(output, adjDeltas[0]);
+
+ // adjacent delta values are bit packed. The length of adjDeltas array is
+ // always one less than the number of literals (delta difference for n
+ // elements is n-1). We have already written one element, write the
+ // remaining numLiterals - 2 elements here
+ utils.writeInts(adjDeltas, 1, numLiterals - 2, fb, output);
+ }
+ }
+
+ private void writePatchedBaseValues() throws IOException {
+
+ // NOTE: Aligned bit packing cannot be applied for PATCHED_BASE encoding
+ // because patch is applied to MSB bits. For example: If fixed bit width of
+ // base value is 7 bits and if patch is 3 bits, the actual value is
+ // constructed by shifting the patch to left by 7 positions.
+ // actual_value = patch << 7 | base_value
+ // So, if we align base_value then actual_value can not be reconstructed.
+
+ // write the number of fixed bits required in next 5 bits
+ final int fb = brBits95p;
+ final int efb = utils.encodeBitWidth(fb) << 1;
+
+ // adjust variable run length, they are one off
+ variableRunLength -= 1;
+
+ // extract the 9th bit of run length
+ final int tailBits = (variableRunLength & 0x100) >>> 8;
+
+ // create first byte of the header
+ final int headerFirstByte = getOpcode() | efb | tailBits;
+
+ // second byte of the header stores the remaining 8 bits of runlength
+ final int headerSecondByte = variableRunLength & 0xff;
+
+ // if the min value is negative toggle the sign
+ final boolean isNegative = min < 0 ? true : false;
+ if (isNegative) {
+ min = -min;
+ }
+
+ // find the number of bytes required for base and shift it by 5 bits
+ // to accommodate patch width. The additional bit is used to store the sign
+ // of the base value.
+ final int baseWidth = utils.findClosestNumBits(min) + 1;
+ final int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1;
+ final int bb = (baseBytes - 1) << 5;
+
+ // if the base value is negative then set MSB to 1
+ if (isNegative) {
+ min |= (1L << ((baseBytes * 8) - 1));
+ }
+
+ // third byte contains 3 bits for number of bytes occupied by base
+ // and 5 bits for patchWidth
+ final int headerThirdByte = bb | utils.encodeBitWidth(patchWidth);
+
+ // fourth byte contains 3 bits for page gap width and 5 bits for
+ // patch length
+ final int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength;
+
+ // write header
+ output.write(headerFirstByte);
+ output.write(headerSecondByte);
+ output.write(headerThirdByte);
+ output.write(headerFourthByte);
+
+ // write the base value using fixed bytes in big endian order
+ for(int i = baseBytes - 1; i >= 0; i--) {
+ byte b = (byte) ((min >>> (i * 8)) & 0xff);
+ output.write(b);
+ }
+
+ // base reduced literals are bit packed
+ int closestFixedBits = utils.getClosestFixedBits(fb);
+
+ utils.writeInts(baseRedLiterals, 0, numLiterals, closestFixedBits,
+ output);
+
+ // write patch list
+ closestFixedBits = utils.getClosestFixedBits(patchGapWidth + patchWidth);
+
+ utils.writeInts(gapVsPatchList, 0, gapVsPatchList.length, closestFixedBits,
+ output);
+
+ // reset run length
+ variableRunLength = 0;
+ }
+
+ /**
+ * Store the opcode in 2 MSB bits
+ * @return opcode
+ */
+ private int getOpcode() {
+ return encoding.ordinal() << 6;
+ }
+
+ private void writeDirectValues() throws IOException {
+
+ // write the number of fixed bits required in next 5 bits
+ int fb = zzBits100p;
+
+ if (alignedBitpacking) {
+ fb = utils.getClosestAlignedFixedBits(fb);
+ }
+
+ final int efb = utils.encodeBitWidth(fb) << 1;
+
+ // adjust variable run length
+ variableRunLength -= 1;
+
+ // extract the 9th bit of run length
+ final int tailBits = (variableRunLength & 0x100) >>> 8;
+
+ // create first byte of the header
+ final int headerFirstByte = getOpcode() | efb | tailBits;
+
+ // second byte of the header stores the remaining 8 bits of runlength
+ final int headerSecondByte = variableRunLength & 0xff;
+
+ // write header
+ output.write(headerFirstByte);
+ output.write(headerSecondByte);
+
+ // bit packing the zigzag encoded literals
+ utils.writeInts(zigzagLiterals, 0, numLiterals, fb, output);
+
+ // reset run length
+ variableRunLength = 0;
+ }
+
+ private void writeShortRepeatValues() throws IOException {
+ // get the value that is repeating, compute the bits and bytes required
+ long repeatVal = 0;
+ if (signed) {
+ repeatVal = utils.zigzagEncode(literals[0]);
+ } else {
+ repeatVal = literals[0];
+ }
+
+ final int numBitsRepeatVal = utils.findClosestNumBits(repeatVal);
+ final int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3
+ : (numBitsRepeatVal >>> 3) + 1;
+
+ // write encoding type in top 2 bits
+ int header = getOpcode();
+
+ // write the number of bytes required for the value
+ header |= ((numBytesRepeatVal - 1) << 3);
+
+ // write the run length
+ fixedRunLength -= MIN_REPEAT;
+ header |= fixedRunLength;
+
+ // write the header
+ output.write(header);
+
+ // write the repeating value in big endian byte order
+ for(int i = numBytesRepeatVal - 1; i >= 0; i--) {
+ int b = (int) ((repeatVal >>> (i * 8)) & 0xff);
+ output.write(b);
+ }
+
+ fixedRunLength = 0;
+ }
+
+ private void determineEncoding() {
+
+ // we need to compute zigzag values for DIRECT encoding if we decide to
+ // break early for delta overflows or for shorter runs
+ computeZigZagLiterals();
+
+ zzBits100p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 1.0);
+
+ // not a big win for shorter runs to determine encoding
+ if (numLiterals <= MIN_REPEAT) {
+ encoding = EncodingType.DIRECT;
+ return;
+ }
+
+ // DELTA encoding check
+
+ // for identifying monotonic sequences
+ boolean isIncreasing = true;
+ boolean isDecreasing = true;
+ this.isFixedDelta = true;
+
+ this.min = literals[0];
+ long max = literals[0];
+ final long initialDelta = literals[1] - literals[0];
+ long currDelta = initialDelta;
+ long deltaMax = initialDelta;
+ this.adjDeltas[0] = initialDelta;
+
+ for (int i = 1; i < numLiterals; i++) {
+ final long l1 = literals[i];
+ final long l0 = literals[i - 1];
+ currDelta = l1 - l0;
+ min = Math.min(min, l1);
+ max = Math.max(max, l1);
+
+ isIncreasing &= (l0 <= l1);
+ isDecreasing &= (l0 >= l1);
+
+ isFixedDelta &= (currDelta == initialDelta);
+ if (i > 1) {
+ adjDeltas[i - 1] = Math.abs(currDelta);
+ deltaMax = Math.max(deltaMax, adjDeltas[i - 1]);
+ }
+ }
+
+ // its faster to exit under delta overflow condition without checking for
+ // PATCHED_BASE condition as encoding using DIRECT is faster and has less
+ // overhead than PATCHED_BASE
+ if (!utils.isSafeSubtract(max, min)) {
+ encoding = EncodingType.DIRECT;
+ return;
+ }
+
+ // invariant - subtracting any number from any other in the literals after
+ // this point won't overflow
+
+ // if min is equal to max then the delta is 0, this condition happens for
+ // fixed values run >10 which cannot be encoded with SHORT_REPEAT
+ if (min == max) {
+ assert isFixedDelta : min + "==" + max +
+ ", isFixedDelta cannot be false";
+ assert currDelta == 0 : min + "==" + max + ", currDelta should be zero";
+ fixedDelta = 0;
+ encoding = EncodingType.DELTA;
+ return;
+ }
+
+ if (isFixedDelta) {
+ assert currDelta == initialDelta
+ : "currDelta should be equal to initialDelta for fixed delta encoding";
+ encoding = EncodingType.DELTA;
+ fixedDelta = currDelta;
+ return;
+ }
+
+ // if initialDelta is 0 then we cannot delta encode as we cannot identify
+ // the sign of deltas (increasing or decreasing)
+ if (initialDelta != 0) {
+ // stores the number of bits required for packing delta blob in
+ // delta encoding
+ bitsDeltaMax = utils.findClosestNumBits(deltaMax);
+
+ // monotonic condition
+ if (isIncreasing || isDecreasing) {
+ encoding = EncodingType.DELTA;
+ return;
+ }
+ }
+
+ // PATCHED_BASE encoding check
+
+ // percentile values are computed for the zigzag encoded values. if the
+ // number of bit requirement between 90th and 100th percentile varies
+ // beyond a threshold then we need to patch the values. if the variation
+ // is not significant then we can use direct encoding
+
+ zzBits90p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 0.9);
+ int diffBitsLH = zzBits100p - zzBits90p;
+
+ // if the difference between 90th percentile and 100th percentile fixed
+ // bits is > 1 then we need patch the values
+ if (diffBitsLH > 1) {
+
+ // patching is done only on base reduced values.
+ // remove base from literals
+ for (int i = 0; i < numLiterals; i++) {
+ baseRedLiterals[i] = literals[i] - min;
+ }
+
+ // 95th percentile width is used to determine max allowed value
+ // after which patching will be done
+ brBits95p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 0.95);
+
+ // 100th percentile is used to compute the max patch width
+ brBits100p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 1.0);
+
+ // after base reducing the values, if the difference in bits between
+ // 95th percentile and 100th percentile value is zero then there
+ // is no point in patching the values, in which case we will
+ // fallback to DIRECT encoding.
+ // The decision to use patched base was based on zigzag values, but the
+ // actual patching is done on base reduced literals.
+ if ((brBits100p - brBits95p) != 0) {
+ encoding = EncodingType.PATCHED_BASE;
+ preparePatchedBlob();
+ return;
+ } else {
+ encoding = EncodingType.DIRECT;
+ return;
+ }
+ } else {
+ // if difference in bits between 95th percentile and 100th percentile is
+ // 0, then patch length will become 0. Hence we will fallback to direct
+ encoding = EncodingType.DIRECT;
+ return;
+ }
+ }
+
+ private void computeZigZagLiterals() {
+ // populate zigzag encoded literals
+ long zzEncVal = 0;
+ for (int i = 0; i < numLiterals; i++) {
+ if (signed) {
+ zzEncVal = utils.zigzagEncode(literals[i]);
+ } else {
+ zzEncVal = literals[i];
+ }
+ zigzagLiterals[i] = zzEncVal;
+ }
+ }
+
+ private void preparePatchedBlob() {
+ // mask will be max value beyond which patch will be generated
+ long mask = (1L << brBits95p) - 1;
+
+ // since we are considering only 95 percentile, the size of gap and
+ // patch array can contain only be 5% values
+ patchLength = (int) Math.ceil((numLiterals * 0.05));
+
+ int[] gapList = new int[patchLength];
+ long[] patchList = new long[patchLength];
+
+ // #bit for patch
+ patchWidth = brBits100p - brBits95p;
+ patchWidth = utils.getClosestFixedBits(patchWidth);
+
+ // if patch bit requirement is 64 then it will not possible to pack
+ // gap and patch together in a long. To make sure gap and patch can be
+ // packed together adjust the patch width
+ if (patchWidth == 64) {
+ patchWidth = 56;
+ brBits95p = 8;
+ mask = (1L << brBits95p) - 1;
+ }
+
+ int gapIdx = 0;
+ int patchIdx = 0;
+ int prev = 0;
+ int gap = 0;
+ int maxGap = 0;
+
+ for(int i = 0; i < numLiterals; i++) {
+ // if value is above mask then create the patch and record the gap
+ if (baseRedLiterals[i] > mask) {
+ gap = i - prev;
+ if (gap > maxGap) {
+ maxGap = gap;
+ }
+
+ // gaps are relative, so store the previous patched value index
+ prev = i;
+ gapList[gapIdx++] = gap;
+
+ // extract the most significant bits that are over mask bits
+ long patch = baseRedLiterals[i] >>> brBits95p;
+ patchList[patchIdx++] = patch;
+
+ // strip off the MSB to enable safe bit packing
+ baseRedLiterals[i] &= mask;
+ }
+ }
+
+ // adjust the patch length to number of entries in gap list
+ patchLength = gapIdx;
+
+ // if the element to be patched is the first and only element then
+ // max gap will be 0, but to store the gap as 0 we need atleast 1 bit
+ if (maxGap == 0 && patchLength != 0) {
+ patchGapWidth = 1;
+ } else {
+ patchGapWidth = utils.findClosestNumBits(maxGap);
+ }
+
+ // special case: if the patch gap width is greater than 256, then
+ // we need 9 bits to encode the gap width. But we only have 3 bits in
+ // header to record the gap width. To deal with this case, we will save
+ // two entries in patch list in the following way
+ // 256 gap width => 0 for patch value
+ // actual gap - 256 => actual patch value
+ // We will do the same for gap width = 511. If the element to be patched is
+ // the last element in the scope then gap width will be 511. In this case we
+ // will have 3 entries in the patch list in the following way
+ // 255 gap width => 0 for patch value
+ // 255 gap width => 0 for patch value
+ // 1 gap width => actual patch value
+ if (patchGapWidth > 8) {
+ patchGapWidth = 8;
+ // for gap = 511, we need two additional entries in patch list
+ if (maxGap == 511) {
+ patchLength += 2;
+ } else {
+ patchLength += 1;
+ }
+ }
+
+ // create gap vs patch list
+ gapIdx = 0;
+ patchIdx = 0;
+ gapVsPatchList = new long[patchLength];
+ for(int i = 0; i < patchLength; i++) {
+ long g = gapList[gapIdx++];
+ long p = patchList[patchIdx++];
+ while (g > 255) {
+ gapVsPatchList[i++] = (255L << patchWidth);
+ g -= 255;
+ }
+
+ // store patch value in LSBs and gap in MSBs
+ gapVsPatchList[i] = (g << patchWidth) | p;
+ }
+ }
+
+ /**
+ * clears all the variables
+ */
+ private void clear() {
+ numLiterals = 0;
+ encoding = null;
+ prevDelta = 0;
+ fixedDelta = 0;
+ zzBits90p = 0;
+ zzBits100p = 0;
+ brBits95p = 0;
+ brBits100p = 0;
+ bitsDeltaMax = 0;
+ patchGapWidth = 0;
+ patchLength = 0;
+ patchWidth = 0;
+ gapVsPatchList = null;
+ min = 0;
+ isFixedDelta = true;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (numLiterals != 0) {
+ if (variableRunLength != 0) {
+ determineEncoding();
+ writeValues();
+ } else if (fixedRunLength != 0) {
+ if (fixedRunLength < MIN_REPEAT) {
+ variableRunLength = fixedRunLength;
+ fixedRunLength = 0;
+ determineEncoding();
+ writeValues();
+ } else if (fixedRunLength >= MIN_REPEAT
+ && fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) {
+ encoding = EncodingType.SHORT_REPEAT;
+ writeValues();
+ } else {
+ encoding = EncodingType.DELTA;
+ isFixedDelta = true;
+ writeValues();
+ }
+ }
+ }
+ output.flush();
+ }
+
+ @Override
+ public void write(long val) throws IOException {
+ if (numLiterals == 0) {
+ initializeLiterals(val);
+ } else {
+ if (numLiterals == 1) {
+ prevDelta = val - literals[0];
+ literals[numLiterals++] = val;
+ // if both values are same count as fixed run else variable run
+ if (val == literals[0]) {
+ fixedRunLength = 2;
+ variableRunLength = 0;
+ } else {
+ fixedRunLength = 0;
+ variableRunLength = 2;
+ }
+ } else {
+ long currentDelta = val - literals[numLiterals - 1];
+ if (prevDelta == 0 && currentDelta == 0) {
+ // fixed delta run
+
+ literals[numLiterals++] = val;
+
+ // if variable run is non-zero then we are seeing repeating
+ // values at the end of variable run in which case keep
+ // updating variable and fixed runs
+ if (variableRunLength > 0) {
+ fixedRunLength = 2;
+ }
+ fixedRunLength += 1;
+
+ // if fixed run met the minimum condition and if variable
+ // run is non-zero then flush the variable run and shift the
+ // tail fixed runs to start of the buffer
+ if (fixedRunLength >= MIN_REPEAT && variableRunLength > 0) {
+ numLiterals -= MIN_REPEAT;
+ variableRunLength -= MIN_REPEAT - 1;
+ // copy the tail fixed runs
+ long[] tailVals = new long[MIN_REPEAT];
+ System.arraycopy(literals, numLiterals, tailVals, 0, MIN_REPEAT);
+
+ // determine variable encoding and flush values
+ determineEncoding();
+ writeValues();
+
+ // shift tail fixed runs to beginning of the buffer
+ for(long l : tailVals) {
+ literals[numLiterals++] = l;
+ }
+ }
+
+ // if fixed runs reached max repeat length then write values
+ if (fixedRunLength == MAX_SCOPE) {
+ determineEncoding();
+ writeValues();
+ }
+ } else {
+ // variable delta run
+
+ // if fixed run length is non-zero and if it satisfies the
+ // short repeat conditions then write the values as short repeats
+ // else use delta encoding
+ if (fixedRunLength >= MIN_REPEAT) {
+ if (fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) {
+ encoding = EncodingType.SHORT_REPEAT;
+ writeValues();
+ } else {
+ encoding = EncodingType.DELTA;
+ isFixedDelta = true;
+ writeValues();
+ }
+ }
+
+ // if fixed run length is <MIN_REPEAT and current value is
+ // different from previous then treat it as variable run
+ if (fixedRunLength > 0 && fixedRunLength < MIN_REPEAT) {
+ if (val != literals[numLiterals - 1]) {
+ variableRunLength = fixedRunLength;
+ fixedRunLength = 0;
+ }
+ }
+
+ // after writing values re-initialize the variables
+ if (numLiterals == 0) {
+ initializeLiterals(val);
+ } else {
+ // keep updating variable run lengths
+ prevDelta = val - literals[numLiterals - 1];
+ literals[numLiterals++] = val;
+ variableRunLength += 1;
+
+ // if variable run length reach the max scope, write it
+ if (variableRunLength == MAX_SCOPE) {
+ determineEncoding();
+ writeValues();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void initializeLiterals(long val) {
+ literals[numLiterals++] = val;
+ fixedRunLength = 1;
+ variableRunLength = 1;
+ }
+
+ @Override
+ public void getPosition(PositionRecorder recorder) throws IOException {
+ output.getPosition(recorder);
+ recorder.addPosition(numLiterals);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/SerializationUtils.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/SerializationUtils.java b/orc/src/java/org/apache/orc/impl/SerializationUtils.java
new file mode 100644
index 0000000..c1162e4
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/SerializationUtils.java
@@ -0,0 +1,1297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.orc.impl.InStream;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.math.BigInteger;
+
+public final class SerializationUtils {
+
+ private final static int BUFFER_SIZE = 64;
+ private final byte[] readBuffer;
+ private final byte[] writeBuffer;
+
+ public SerializationUtils() {
+ this.readBuffer = new byte[BUFFER_SIZE];
+ this.writeBuffer = new byte[BUFFER_SIZE];
+ }
+
+ public void writeVulong(OutputStream output,
+ long value) throws IOException {
+ while (true) {
+ if ((value & ~0x7f) == 0) {
+ output.write((byte) value);
+ return;
+ } else {
+ output.write((byte) (0x80 | (value & 0x7f)));
+ value >>>= 7;
+ }
+ }
+ }
+
+ public void writeVslong(OutputStream output,
+ long value) throws IOException {
+ writeVulong(output, (value << 1) ^ (value >> 63));
+ }
+
+
+ public long readVulong(InputStream in) throws IOException {
+ long result = 0;
+ long b;
+ int offset = 0;
+ do {
+ b = in.read();
+ if (b == -1) {
+ throw new EOFException("Reading Vulong past EOF");
+ }
+ result |= (0x7f & b) << offset;
+ offset += 7;
+ } while (b >= 0x80);
+ return result;
+ }
+
+ public long readVslong(InputStream in) throws IOException {
+ long result = readVulong(in);
+ return (result >>> 1) ^ -(result & 1);
+ }
+
+ public float readFloat(InputStream in) throws IOException {
+ int ser = in.read() | (in.read() << 8) | (in.read() << 16) |
+ (in.read() << 24);
+ return Float.intBitsToFloat(ser);
+ }
+
+ public void writeFloat(OutputStream output,
+ float value) throws IOException {
+ int ser = Float.floatToIntBits(value);
+ output.write(ser & 0xff);
+ output.write((ser >> 8) & 0xff);
+ output.write((ser >> 16) & 0xff);
+ output.write((ser >> 24) & 0xff);
+ }
+
+ public double readDouble(InputStream in) throws IOException {
+ return Double.longBitsToDouble(readLongLE(in));
+ }
+
+ public long readLongLE(InputStream in) throws IOException {
+ in.read(readBuffer, 0, 8);
+ return (((readBuffer[0] & 0xff) << 0)
+ + ((readBuffer[1] & 0xff) << 8)
+ + ((readBuffer[2] & 0xff) << 16)
+ + ((long) (readBuffer[3] & 0xff) << 24)
+ + ((long) (readBuffer[4] & 0xff) << 32)
+ + ((long) (readBuffer[5] & 0xff) << 40)
+ + ((long) (readBuffer[6] & 0xff) << 48)
+ + ((long) (readBuffer[7] & 0xff) << 56));
+ }
+
+ public void writeDouble(OutputStream output,
+ double value) throws IOException {
+ writeLongLE(output, Double.doubleToLongBits(value));
+ }
+
+ private void writeLongLE(OutputStream output, long value) throws IOException {
+ writeBuffer[0] = (byte) ((value >> 0) & 0xff);
+ writeBuffer[1] = (byte) ((value >> 8) & 0xff);
+ writeBuffer[2] = (byte) ((value >> 16) & 0xff);
+ writeBuffer[3] = (byte) ((value >> 24) & 0xff);
+ writeBuffer[4] = (byte) ((value >> 32) & 0xff);
+ writeBuffer[5] = (byte) ((value >> 40) & 0xff);
+ writeBuffer[6] = (byte) ((value >> 48) & 0xff);
+ writeBuffer[7] = (byte) ((value >> 56) & 0xff);
+ output.write(writeBuffer, 0, 8);
+ }
+
+ /**
+ * Write the arbitrarily sized signed BigInteger in vint format.
+ *
+ * Signed integers are encoded using the low bit as the sign bit using zigzag
+ * encoding.
+ *
+ * Each byte uses the low 7 bits for data and the high bit for stop/continue.
+ *
+ * Bytes are stored LSB first.
+ * @param output the stream to write to
+ * @param value the value to output
+ * @throws IOException
+ */
+ public static void writeBigInteger(OutputStream output,
+ BigInteger value) throws IOException {
+ // encode the signed number as a positive integer
+ value = value.shiftLeft(1);
+ int sign = value.signum();
+ if (sign < 0) {
+ value = value.negate();
+ value = value.subtract(BigInteger.ONE);
+ }
+ int length = value.bitLength();
+ while (true) {
+ long lowBits = value.longValue() & 0x7fffffffffffffffL;
+ length -= 63;
+ // write out the next 63 bits worth of data
+ for(int i=0; i < 9; ++i) {
+ // if this is the last byte, leave the high bit off
+ if (length <= 0 && (lowBits & ~0x7f) == 0) {
+ output.write((byte) lowBits);
+ return;
+ } else {
+ output.write((byte) (0x80 | (lowBits & 0x7f)));
+ lowBits >>>= 7;
+ }
+ }
+ value = value.shiftRight(63);
+ }
+ }
+
+ /**
+ * Read the signed arbitrary sized BigInteger BigInteger in vint format
+ * @param input the stream to read from
+ * @return the read BigInteger
+ * @throws IOException
+ */
+ public static BigInteger readBigInteger(InputStream input) throws IOException {
+ BigInteger result = BigInteger.ZERO;
+ long work = 0;
+ int offset = 0;
+ long b;
+ do {
+ b = input.read();
+ if (b == -1) {
+ throw new EOFException("Reading BigInteger past EOF from " + input);
+ }
+ work |= (0x7f & b) << (offset % 63);
+ offset += 7;
+ // if we've read 63 bits, roll them into the result
+ if (offset == 63) {
+ result = BigInteger.valueOf(work);
+ work = 0;
+ } else if (offset % 63 == 0) {
+ result = result.or(BigInteger.valueOf(work).shiftLeft(offset-63));
+ work = 0;
+ }
+ } while (b >= 0x80);
+ if (work != 0) {
+ result = result.or(BigInteger.valueOf(work).shiftLeft((offset/63)*63));
+ }
+ // convert back to a signed number
+ boolean isNegative = result.testBit(0);
+ if (isNegative) {
+ result = result.add(BigInteger.ONE);
+ result = result.negate();
+ }
+ result = result.shiftRight(1);
+ return result;
+ }
+
+ public enum FixedBitSizes {
+ ONE, TWO, THREE, FOUR, FIVE, SIX, SEVEN, EIGHT, NINE, TEN, ELEVEN, TWELVE,
+ THIRTEEN, FOURTEEN, FIFTEEN, SIXTEEN, SEVENTEEN, EIGHTEEN, NINETEEN,
+ TWENTY, TWENTYONE, TWENTYTWO, TWENTYTHREE, TWENTYFOUR, TWENTYSIX,
+ TWENTYEIGHT, THIRTY, THIRTYTWO, FORTY, FORTYEIGHT, FIFTYSIX, SIXTYFOUR;
+ }
+
+ /**
+ * Count the number of bits required to encode the given value
+ * @param value
+ * @return bits required to store value
+ */
+ public int findClosestNumBits(long value) {
+ int count = 0;
+ while (value != 0) {
+ count++;
+ value = value >>> 1;
+ }
+ return getClosestFixedBits(count);
+ }
+
+ /**
+ * zigzag encode the given value
+ * @param val
+ * @return zigzag encoded value
+ */
+ public long zigzagEncode(long val) {
+ return (val << 1) ^ (val >> 63);
+ }
+
+ /**
+ * zigzag decode the given value
+ * @param val
+ * @return zizag decoded value
+ */
+ public long zigzagDecode(long val) {
+ return (val >>> 1) ^ -(val & 1);
+ }
+
+ /**
+ * Compute the bits required to represent pth percentile value
+ * @param data - array
+ * @param p - percentile value (>=0.0 to <=1.0)
+ * @return pth percentile bits
+ */
+ public int percentileBits(long[] data, int offset, int length,
+ double p) {
+ if ((p > 1.0) || (p <= 0.0)) {
+ return -1;
+ }
+
+ // histogram that store the encoded bit requirement for each values.
+ // maximum number of bits that can encoded is 32 (refer FixedBitSizes)
+ int[] hist = new int[32];
+
+ // compute the histogram
+ for(int i = offset; i < (offset + length); i++) {
+ int idx = encodeBitWidth(findClosestNumBits(data[i]));
+ hist[idx] += 1;
+ }
+
+ int perLen = (int) (length * (1.0 - p));
+
+ // return the bits required by pth percentile length
+ for(int i = hist.length - 1; i >= 0; i--) {
+ perLen -= hist[i];
+ if (perLen < 0) {
+ return decodeBitWidth(i);
+ }
+ }
+
+ return 0;
+ }
+
+ /**
+ * Read n bytes in big endian order and convert to long
+ * @return long value
+ */
+ public long bytesToLongBE(InStream input, int n) throws IOException {
+ long out = 0;
+ long val = 0;
+ while (n > 0) {
+ n--;
+ // store it in a long and then shift else integer overflow will occur
+ val = input.read();
+ out |= (val << (n * 8));
+ }
+ return out;
+ }
+
+ /**
+ * Calculate the number of bytes required
+ * @param n - number of values
+ * @param numBits - bit width
+ * @return number of bytes required
+ */
+ int getTotalBytesRequired(int n, int numBits) {
+ return (n * numBits + 7) / 8;
+ }
+
+ /**
+ * For a given fixed bit this function will return the closest available fixed
+ * bit
+ * @param n
+ * @return closest valid fixed bit
+ */
+ public int getClosestFixedBits(int n) {
+ if (n == 0) {
+ return 1;
+ }
+
+ if (n >= 1 && n <= 24) {
+ return n;
+ } else if (n > 24 && n <= 26) {
+ return 26;
+ } else if (n > 26 && n <= 28) {
+ return 28;
+ } else if (n > 28 && n <= 30) {
+ return 30;
+ } else if (n > 30 && n <= 32) {
+ return 32;
+ } else if (n > 32 && n <= 40) {
+ return 40;
+ } else if (n > 40 && n <= 48) {
+ return 48;
+ } else if (n > 48 && n <= 56) {
+ return 56;
+ } else {
+ return 64;
+ }
+ }
+
+ public int getClosestAlignedFixedBits(int n) {
+ if (n == 0 || n == 1) {
+ return 1;
+ } else if (n > 1 && n <= 2) {
+ return 2;
+ } else if (n > 2 && n <= 4) {
+ return 4;
+ } else if (n > 4 && n <= 8) {
+ return 8;
+ } else if (n > 8 && n <= 16) {
+ return 16;
+ } else if (n > 16 && n <= 24) {
+ return 24;
+ } else if (n > 24 && n <= 32) {
+ return 32;
+ } else if (n > 32 && n <= 40) {
+ return 40;
+ } else if (n > 40 && n <= 48) {
+ return 48;
+ } else if (n > 48 && n <= 56) {
+ return 56;
+ } else {
+ return 64;
+ }
+ }
+
+ /**
+ * Finds the closest available fixed bit width match and returns its encoded
+ * value (ordinal)
+ * @param n - fixed bit width to encode
+ * @return encoded fixed bit width
+ */
+ public int encodeBitWidth(int n) {
+ n = getClosestFixedBits(n);
+
+ if (n >= 1 && n <= 24) {
+ return n - 1;
+ } else if (n > 24 && n <= 26) {
+ return FixedBitSizes.TWENTYSIX.ordinal();
+ } else if (n > 26 && n <= 28) {
+ return FixedBitSizes.TWENTYEIGHT.ordinal();
+ } else if (n > 28 && n <= 30) {
+ return FixedBitSizes.THIRTY.ordinal();
+ } else if (n > 30 && n <= 32) {
+ return FixedBitSizes.THIRTYTWO.ordinal();
+ } else if (n > 32 && n <= 40) {
+ return FixedBitSizes.FORTY.ordinal();
+ } else if (n > 40 && n <= 48) {
+ return FixedBitSizes.FORTYEIGHT.ordinal();
+ } else if (n > 48 && n <= 56) {
+ return FixedBitSizes.FIFTYSIX.ordinal();
+ } else {
+ return FixedBitSizes.SIXTYFOUR.ordinal();
+ }
+ }
+
+ /**
+ * Decodes the ordinal fixed bit value to actual fixed bit width value
+ * @param n - encoded fixed bit width
+ * @return decoded fixed bit width
+ */
+ public int decodeBitWidth(int n) {
+ if (n >= FixedBitSizes.ONE.ordinal()
+ && n <= FixedBitSizes.TWENTYFOUR.ordinal()) {
+ return n + 1;
+ } else if (n == FixedBitSizes.TWENTYSIX.ordinal()) {
+ return 26;
+ } else if (n == FixedBitSizes.TWENTYEIGHT.ordinal()) {
+ return 28;
+ } else if (n == FixedBitSizes.THIRTY.ordinal()) {
+ return 30;
+ } else if (n == FixedBitSizes.THIRTYTWO.ordinal()) {
+ return 32;
+ } else if (n == FixedBitSizes.FORTY.ordinal()) {
+ return 40;
+ } else if (n == FixedBitSizes.FORTYEIGHT.ordinal()) {
+ return 48;
+ } else if (n == FixedBitSizes.FIFTYSIX.ordinal()) {
+ return 56;
+ } else {
+ return 64;
+ }
+ }
+
+ /**
+ * Bitpack and write the input values to underlying output stream
+ * @param input - values to write
+ * @param offset - offset
+ * @param len - length
+ * @param bitSize - bit width
+ * @param output - output stream
+ * @throws IOException
+ */
+ public void writeInts(long[] input, int offset, int len, int bitSize,
+ OutputStream output) throws IOException {
+ if (input == null || input.length < 1 || offset < 0 || len < 1
+ || bitSize < 1) {
+ return;
+ }
+
+ switch (bitSize) {
+ case 1:
+ unrolledBitPack1(input, offset, len, output);
+ return;
+ case 2:
+ unrolledBitPack2(input, offset, len, output);
+ return;
+ case 4:
+ unrolledBitPack4(input, offset, len, output);
+ return;
+ case 8:
+ unrolledBitPack8(input, offset, len, output);
+ return;
+ case 16:
+ unrolledBitPack16(input, offset, len, output);
+ return;
+ case 24:
+ unrolledBitPack24(input, offset, len, output);
+ return;
+ case 32:
+ unrolledBitPack32(input, offset, len, output);
+ return;
+ case 40:
+ unrolledBitPack40(input, offset, len, output);
+ return;
+ case 48:
+ unrolledBitPack48(input, offset, len, output);
+ return;
+ case 56:
+ unrolledBitPack56(input, offset, len, output);
+ return;
+ case 64:
+ unrolledBitPack64(input, offset, len, output);
+ return;
+ default:
+ break;
+ }
+
+ int bitsLeft = 8;
+ byte current = 0;
+ for(int i = offset; i < (offset + len); i++) {
+ long value = input[i];
+ int bitsToWrite = bitSize;
+ while (bitsToWrite > bitsLeft) {
+ // add the bits to the bottom of the current word
+ current |= value >>> (bitsToWrite - bitsLeft);
+ // subtract out the bits we just added
+ bitsToWrite -= bitsLeft;
+ // zero out the bits above bitsToWrite
+ value &= (1L << bitsToWrite) - 1;
+ output.write(current);
+ current = 0;
+ bitsLeft = 8;
+ }
+ bitsLeft -= bitsToWrite;
+ current |= value << bitsLeft;
+ if (bitsLeft == 0) {
+ output.write(current);
+ current = 0;
+ bitsLeft = 8;
+ }
+ }
+
+ // flush
+ if (bitsLeft != 8) {
+ output.write(current);
+ current = 0;
+ bitsLeft = 8;
+ }
+ }
+
+ private void unrolledBitPack1(long[] input, int offset, int len,
+ OutputStream output) throws IOException {
+ final int numHops = 8;
+ final int remainder = len % numHops;
+ final int endOffset = offset + len;
+ final int endUnroll = endOffset - remainder;
+ int val = 0;
+ for (int i = offset; i < endUnroll; i = i + numHops) {
+ val = (int) (val | ((input[i] & 1) << 7)
+ | ((input[i + 1] & 1) << 6)
+ | ((input[i + 2] & 1) << 5)
+ | ((input[i + 3] & 1) << 4)
+ | ((input[i + 4] & 1) << 3)
+ | ((input[i + 5] & 1) << 2)
+ | ((input[i + 6] & 1) << 1)
+ | (input[i + 7]) & 1);
+ output.write(val);
+ val = 0;
+ }
+
+ if (remainder > 0) {
+ int startShift = 7;
+ for (int i = endUnroll; i < endOffset; i++) {
+ val = (int) (val | (input[i] & 1) << startShift);
+ startShift -= 1;
+ }
+ output.write(val);
+ }
+ }
+
+ private void unrolledBitPack2(long[] input, int offset, int len,
+ OutputStream output) throws IOException {
+ final int numHops = 4;
+ final int remainder = len % numHops;
+ final int endOffset = offset + len;
+ final int endUnroll = endOffset - remainder;
+ int val = 0;
+ for (int i = offset; i < endUnroll; i = i + numHops) {
+ val = (int) (val | ((input[i] & 3) << 6)
+ | ((input[i + 1] & 3) << 4)
+ | ((input[i + 2] & 3) << 2)
+ | (input[i + 3]) & 3);
+ output.write(val);
+ val = 0;
+ }
+
+ if (remainder > 0) {
+ int startShift = 6;
+ for (int i = endUnroll; i < endOffset; i++) {
+ val = (int) (val | (input[i] & 3) << startShift);
+ startShift -= 2;
+ }
+ output.write(val);
+ }
+ }
+
+ private void unrolledBitPack4(long[] input, int offset, int len,
+ OutputStream output) throws IOException {
+ final int numHops = 2;
+ final int remainder = len % numHops;
+ final int endOffset = offset + len;
+ final int endUnroll = endOffset - remainder;
+ int val = 0;
+ for (int i = offset; i < endUnroll; i = i + numHops) {
+ val = (int) (val | ((input[i] & 15) << 4) | (input[i + 1]) & 15);
+ output.write(val);
+ val = 0;
+ }
+
+ if (remainder > 0) {
+ int startShift = 4;
+ for (int i = endUnroll; i < endOffset; i++) {
+ val = (int) (val | (input[i] & 15) << startShift);
+ startShift -= 4;
+ }
+ output.write(val);
+ }
+ }
+
+ private void unrolledBitPack8(long[] input, int offset, int len,
+ OutputStream output) throws IOException {
+ unrolledBitPackBytes(input, offset, len, output, 1);
+ }
+
+ private void unrolledBitPack16(long[] input, int offset, int len,
+ OutputStream output) throws IOException {
+ unrolledBitPackBytes(input, offset, len, output, 2);
+ }
+
+ private void unrolledBitPack24(long[] input, int offset, int len,
+ OutputStream output) throws IOException {
+ unrolledBitPackBytes(input, offset, len, output, 3);
+ }
+
+ private void unrolledBitPack32(long[] input, int offset, int len,
+ OutputStream output) throws IOException {
+ unrolledBitPackBytes(input, offset, len, output, 4);
+ }
+
+ private void unrolledBitPack40(long[] input, int offset, int len,
+ OutputStream output) throws IOException {
+ unrolledBitPackBytes(input, offset, len, output, 5);
+ }
+
+ private void unrolledBitPack48(long[] input, int offset, int len,
+ OutputStream output) throws IOException {
+ unrolledBitPackBytes(input, offset, len, output, 6);
+ }
+
+ private void unrolledBitPack56(long[] input, int offset, int len,
+ OutputStream output) throws IOException {
+ unrolledBitPackBytes(input, offset, len, output, 7);
+ }
+
+ private void unrolledBitPack64(long[] input, int offset, int len,
+ OutputStream output) throws IOException {
+ unrolledBitPackBytes(input, offset, len, output, 8);
+ }
+
+ private void unrolledBitPackBytes(long[] input, int offset, int len, OutputStream output, int numBytes) throws IOException {
+ final int numHops = 8;
+ final int remainder = len % numHops;
+ final int endOffset = offset + len;
+ final int endUnroll = endOffset - remainder;
+ int i = offset;
+ for (; i < endUnroll; i = i + numHops) {
+ writeLongBE(output, input, i, numHops, numBytes);
+ }
+
+ if (remainder > 0) {
+ writeRemainingLongs(output, i, input, remainder, numBytes);
+ }
+ }
+
+ private void writeRemainingLongs(OutputStream output, int offset, long[] input, int remainder,
+ int numBytes) throws IOException {
+ final int numHops = remainder;
+
+ int idx = 0;
+ switch (numBytes) {
+ case 1:
+ while (remainder > 0) {
+ writeBuffer[idx] = (byte) (input[offset + idx] & 255);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 2:
+ while (remainder > 0) {
+ writeLongBE2(output, input[offset + idx], idx * 2);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 3:
+ while (remainder > 0) {
+ writeLongBE3(output, input[offset + idx], idx * 3);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 4:
+ while (remainder > 0) {
+ writeLongBE4(output, input[offset + idx], idx * 4);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 5:
+ while (remainder > 0) {
+ writeLongBE5(output, input[offset + idx], idx * 5);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 6:
+ while (remainder > 0) {
+ writeLongBE6(output, input[offset + idx], idx * 6);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 7:
+ while (remainder > 0) {
+ writeLongBE7(output, input[offset + idx], idx * 7);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 8:
+ while (remainder > 0) {
+ writeLongBE8(output, input[offset + idx], idx * 8);
+ remainder--;
+ idx++;
+ }
+ break;
+ default:
+ break;
+ }
+
+ final int toWrite = numHops * numBytes;
+ output.write(writeBuffer, 0, toWrite);
+ }
+
+ private void writeLongBE(OutputStream output, long[] input, int offset, int numHops, int numBytes) throws IOException {
+
+ switch (numBytes) {
+ case 1:
+ writeBuffer[0] = (byte) (input[offset + 0] & 255);
+ writeBuffer[1] = (byte) (input[offset + 1] & 255);
+ writeBuffer[2] = (byte) (input[offset + 2] & 255);
+ writeBuffer[3] = (byte) (input[offset + 3] & 255);
+ writeBuffer[4] = (byte) (input[offset + 4] & 255);
+ writeBuffer[5] = (byte) (input[offset + 5] & 255);
+ writeBuffer[6] = (byte) (input[offset + 6] & 255);
+ writeBuffer[7] = (byte) (input[offset + 7] & 255);
+ break;
+ case 2:
+ writeLongBE2(output, input[offset + 0], 0);
+ writeLongBE2(output, input[offset + 1], 2);
+ writeLongBE2(output, input[offset + 2], 4);
+ writeLongBE2(output, input[offset + 3], 6);
+ writeLongBE2(output, input[offset + 4], 8);
+ writeLongBE2(output, input[offset + 5], 10);
+ writeLongBE2(output, input[offset + 6], 12);
+ writeLongBE2(output, input[offset + 7], 14);
+ break;
+ case 3:
+ writeLongBE3(output, input[offset + 0], 0);
+ writeLongBE3(output, input[offset + 1], 3);
+ writeLongBE3(output, input[offset + 2], 6);
+ writeLongBE3(output, input[offset + 3], 9);
+ writeLongBE3(output, input[offset + 4], 12);
+ writeLongBE3(output, input[offset + 5], 15);
+ writeLongBE3(output, input[offset + 6], 18);
+ writeLongBE3(output, input[offset + 7], 21);
+ break;
+ case 4:
+ writeLongBE4(output, input[offset + 0], 0);
+ writeLongBE4(output, input[offset + 1], 4);
+ writeLongBE4(output, input[offset + 2], 8);
+ writeLongBE4(output, input[offset + 3], 12);
+ writeLongBE4(output, input[offset + 4], 16);
+ writeLongBE4(output, input[offset + 5], 20);
+ writeLongBE4(output, input[offset + 6], 24);
+ writeLongBE4(output, input[offset + 7], 28);
+ break;
+ case 5:
+ writeLongBE5(output, input[offset + 0], 0);
+ writeLongBE5(output, input[offset + 1], 5);
+ writeLongBE5(output, input[offset + 2], 10);
+ writeLongBE5(output, input[offset + 3], 15);
+ writeLongBE5(output, input[offset + 4], 20);
+ writeLongBE5(output, input[offset + 5], 25);
+ writeLongBE5(output, input[offset + 6], 30);
+ writeLongBE5(output, input[offset + 7], 35);
+ break;
+ case 6:
+ writeLongBE6(output, input[offset + 0], 0);
+ writeLongBE6(output, input[offset + 1], 6);
+ writeLongBE6(output, input[offset + 2], 12);
+ writeLongBE6(output, input[offset + 3], 18);
+ writeLongBE6(output, input[offset + 4], 24);
+ writeLongBE6(output, input[offset + 5], 30);
+ writeLongBE6(output, input[offset + 6], 36);
+ writeLongBE6(output, input[offset + 7], 42);
+ break;
+ case 7:
+ writeLongBE7(output, input[offset + 0], 0);
+ writeLongBE7(output, input[offset + 1], 7);
+ writeLongBE7(output, input[offset + 2], 14);
+ writeLongBE7(output, input[offset + 3], 21);
+ writeLongBE7(output, input[offset + 4], 28);
+ writeLongBE7(output, input[offset + 5], 35);
+ writeLongBE7(output, input[offset + 6], 42);
+ writeLongBE7(output, input[offset + 7], 49);
+ break;
+ case 8:
+ writeLongBE8(output, input[offset + 0], 0);
+ writeLongBE8(output, input[offset + 1], 8);
+ writeLongBE8(output, input[offset + 2], 16);
+ writeLongBE8(output, input[offset + 3], 24);
+ writeLongBE8(output, input[offset + 4], 32);
+ writeLongBE8(output, input[offset + 5], 40);
+ writeLongBE8(output, input[offset + 6], 48);
+ writeLongBE8(output, input[offset + 7], 56);
+ break;
+ default:
+ break;
+ }
+
+ final int toWrite = numHops * numBytes;
+ output.write(writeBuffer, 0, toWrite);
+ }
+
+ private void writeLongBE2(OutputStream output, long val, int wbOffset) {
+ writeBuffer[wbOffset + 0] = (byte) (val >>> 8);
+ writeBuffer[wbOffset + 1] = (byte) (val >>> 0);
+ }
+
+ private void writeLongBE3(OutputStream output, long val, int wbOffset) {
+ writeBuffer[wbOffset + 0] = (byte) (val >>> 16);
+ writeBuffer[wbOffset + 1] = (byte) (val >>> 8);
+ writeBuffer[wbOffset + 2] = (byte) (val >>> 0);
+ }
+
+ private void writeLongBE4(OutputStream output, long val, int wbOffset) {
+ writeBuffer[wbOffset + 0] = (byte) (val >>> 24);
+ writeBuffer[wbOffset + 1] = (byte) (val >>> 16);
+ writeBuffer[wbOffset + 2] = (byte) (val >>> 8);
+ writeBuffer[wbOffset + 3] = (byte) (val >>> 0);
+ }
+
+ private void writeLongBE5(OutputStream output, long val, int wbOffset) {
+ writeBuffer[wbOffset + 0] = (byte) (val >>> 32);
+ writeBuffer[wbOffset + 1] = (byte) (val >>> 24);
+ writeBuffer[wbOffset + 2] = (byte) (val >>> 16);
+ writeBuffer[wbOffset + 3] = (byte) (val >>> 8);
+ writeBuffer[wbOffset + 4] = (byte) (val >>> 0);
+ }
+
+ private void writeLongBE6(OutputStream output, long val, int wbOffset) {
+ writeBuffer[wbOffset + 0] = (byte) (val >>> 40);
+ writeBuffer[wbOffset + 1] = (byte) (val >>> 32);
+ writeBuffer[wbOffset + 2] = (byte) (val >>> 24);
+ writeBuffer[wbOffset + 3] = (byte) (val >>> 16);
+ writeBuffer[wbOffset + 4] = (byte) (val >>> 8);
+ writeBuffer[wbOffset + 5] = (byte) (val >>> 0);
+ }
+
+ private void writeLongBE7(OutputStream output, long val, int wbOffset) {
+ writeBuffer[wbOffset + 0] = (byte) (val >>> 48);
+ writeBuffer[wbOffset + 1] = (byte) (val >>> 40);
+ writeBuffer[wbOffset + 2] = (byte) (val >>> 32);
+ writeBuffer[wbOffset + 3] = (byte) (val >>> 24);
+ writeBuffer[wbOffset + 4] = (byte) (val >>> 16);
+ writeBuffer[wbOffset + 5] = (byte) (val >>> 8);
+ writeBuffer[wbOffset + 6] = (byte) (val >>> 0);
+ }
+
+ private void writeLongBE8(OutputStream output, long val, int wbOffset) {
+ writeBuffer[wbOffset + 0] = (byte) (val >>> 56);
+ writeBuffer[wbOffset + 1] = (byte) (val >>> 48);
+ writeBuffer[wbOffset + 2] = (byte) (val >>> 40);
+ writeBuffer[wbOffset + 3] = (byte) (val >>> 32);
+ writeBuffer[wbOffset + 4] = (byte) (val >>> 24);
+ writeBuffer[wbOffset + 5] = (byte) (val >>> 16);
+ writeBuffer[wbOffset + 6] = (byte) (val >>> 8);
+ writeBuffer[wbOffset + 7] = (byte) (val >>> 0);
+ }
+
+ /**
+ * Read bitpacked integers from input stream
+ * @param buffer - input buffer
+ * @param offset - offset
+ * @param len - length
+ * @param bitSize - bit width
+ * @param input - input stream
+ * @throws IOException
+ */
+ public void readInts(long[] buffer, int offset, int len, int bitSize,
+ InStream input) throws IOException {
+ int bitsLeft = 0;
+ int current = 0;
+
+ switch (bitSize) {
+ case 1:
+ unrolledUnPack1(buffer, offset, len, input);
+ return;
+ case 2:
+ unrolledUnPack2(buffer, offset, len, input);
+ return;
+ case 4:
+ unrolledUnPack4(buffer, offset, len, input);
+ return;
+ case 8:
+ unrolledUnPack8(buffer, offset, len, input);
+ return;
+ case 16:
+ unrolledUnPack16(buffer, offset, len, input);
+ return;
+ case 24:
+ unrolledUnPack24(buffer, offset, len, input);
+ return;
+ case 32:
+ unrolledUnPack32(buffer, offset, len, input);
+ return;
+ case 40:
+ unrolledUnPack40(buffer, offset, len, input);
+ return;
+ case 48:
+ unrolledUnPack48(buffer, offset, len, input);
+ return;
+ case 56:
+ unrolledUnPack56(buffer, offset, len, input);
+ return;
+ case 64:
+ unrolledUnPack64(buffer, offset, len, input);
+ return;
+ default:
+ break;
+ }
+
+ for(int i = offset; i < (offset + len); i++) {
+ long result = 0;
+ int bitsLeftToRead = bitSize;
+ while (bitsLeftToRead > bitsLeft) {
+ result <<= bitsLeft;
+ result |= current & ((1 << bitsLeft) - 1);
+ bitsLeftToRead -= bitsLeft;
+ current = input.read();
+ bitsLeft = 8;
+ }
+
+ // handle the left over bits
+ if (bitsLeftToRead > 0) {
+ result <<= bitsLeftToRead;
+ bitsLeft -= bitsLeftToRead;
+ result |= (current >> bitsLeft) & ((1 << bitsLeftToRead) - 1);
+ }
+ buffer[i] = result;
+ }
+ }
+
+
+ private void unrolledUnPack1(long[] buffer, int offset, int len,
+ InStream input) throws IOException {
+ final int numHops = 8;
+ final int remainder = len % numHops;
+ final int endOffset = offset + len;
+ final int endUnroll = endOffset - remainder;
+ int val = 0;
+ for (int i = offset; i < endUnroll; i = i + numHops) {
+ val = input.read();
+ buffer[i] = (val >>> 7) & 1;
+ buffer[i + 1] = (val >>> 6) & 1;
+ buffer[i + 2] = (val >>> 5) & 1;
+ buffer[i + 3] = (val >>> 4) & 1;
+ buffer[i + 4] = (val >>> 3) & 1;
+ buffer[i + 5] = (val >>> 2) & 1;
+ buffer[i + 6] = (val >>> 1) & 1;
+ buffer[i + 7] = val & 1;
+ }
+
+ if (remainder > 0) {
+ int startShift = 7;
+ val = input.read();
+ for (int i = endUnroll; i < endOffset; i++) {
+ buffer[i] = (val >>> startShift) & 1;
+ startShift -= 1;
+ }
+ }
+ }
+
+ private void unrolledUnPack2(long[] buffer, int offset, int len,
+ InStream input) throws IOException {
+ final int numHops = 4;
+ final int remainder = len % numHops;
+ final int endOffset = offset + len;
+ final int endUnroll = endOffset - remainder;
+ int val = 0;
+ for (int i = offset; i < endUnroll; i = i + numHops) {
+ val = input.read();
+ buffer[i] = (val >>> 6) & 3;
+ buffer[i + 1] = (val >>> 4) & 3;
+ buffer[i + 2] = (val >>> 2) & 3;
+ buffer[i + 3] = val & 3;
+ }
+
+ if (remainder > 0) {
+ int startShift = 6;
+ val = input.read();
+ for (int i = endUnroll; i < endOffset; i++) {
+ buffer[i] = (val >>> startShift) & 3;
+ startShift -= 2;
+ }
+ }
+ }
+
+ private void unrolledUnPack4(long[] buffer, int offset, int len,
+ InStream input) throws IOException {
+ final int numHops = 2;
+ final int remainder = len % numHops;
+ final int endOffset = offset + len;
+ final int endUnroll = endOffset - remainder;
+ int val = 0;
+ for (int i = offset; i < endUnroll; i = i + numHops) {
+ val = input.read();
+ buffer[i] = (val >>> 4) & 15;
+ buffer[i + 1] = val & 15;
+ }
+
+ if (remainder > 0) {
+ int startShift = 4;
+ val = input.read();
+ for (int i = endUnroll; i < endOffset; i++) {
+ buffer[i] = (val >>> startShift) & 15;
+ startShift -= 4;
+ }
+ }
+ }
+
+ private void unrolledUnPack8(long[] buffer, int offset, int len,
+ InStream input) throws IOException {
+ unrolledUnPackBytes(buffer, offset, len, input, 1);
+ }
+
+ private void unrolledUnPack16(long[] buffer, int offset, int len,
+ InStream input) throws IOException {
+ unrolledUnPackBytes(buffer, offset, len, input, 2);
+ }
+
+ private void unrolledUnPack24(long[] buffer, int offset, int len,
+ InStream input) throws IOException {
+ unrolledUnPackBytes(buffer, offset, len, input, 3);
+ }
+
+ private void unrolledUnPack32(long[] buffer, int offset, int len,
+ InStream input) throws IOException {
+ unrolledUnPackBytes(buffer, offset, len, input, 4);
+ }
+
+ private void unrolledUnPack40(long[] buffer, int offset, int len,
+ InStream input) throws IOException {
+ unrolledUnPackBytes(buffer, offset, len, input, 5);
+ }
+
+ private void unrolledUnPack48(long[] buffer, int offset, int len,
+ InStream input) throws IOException {
+ unrolledUnPackBytes(buffer, offset, len, input, 6);
+ }
+
+ private void unrolledUnPack56(long[] buffer, int offset, int len,
+ InStream input) throws IOException {
+ unrolledUnPackBytes(buffer, offset, len, input, 7);
+ }
+
+ private void unrolledUnPack64(long[] buffer, int offset, int len,
+ InStream input) throws IOException {
+ unrolledUnPackBytes(buffer, offset, len, input, 8);
+ }
+
+ private void unrolledUnPackBytes(long[] buffer, int offset, int len, InStream input, int numBytes)
+ throws IOException {
+ final int numHops = 8;
+ final int remainder = len % numHops;
+ final int endOffset = offset + len;
+ final int endUnroll = endOffset - remainder;
+ int i = offset;
+ for (; i < endUnroll; i = i + numHops) {
+ readLongBE(input, buffer, i, numHops, numBytes);
+ }
+
+ if (remainder > 0) {
+ readRemainingLongs(buffer, i, input, remainder, numBytes);
+ }
+ }
+
+ private void readRemainingLongs(long[] buffer, int offset, InStream input, int remainder,
+ int numBytes) throws IOException {
+ final int toRead = remainder * numBytes;
+ // bulk read to buffer
+ int bytesRead = input.read(readBuffer, 0, toRead);
+ while (bytesRead != toRead) {
+ bytesRead += input.read(readBuffer, bytesRead, toRead - bytesRead);
+ }
+
+ int idx = 0;
+ switch (numBytes) {
+ case 1:
+ while (remainder > 0) {
+ buffer[offset++] = readBuffer[idx] & 255;
+ remainder--;
+ idx++;
+ }
+ break;
+ case 2:
+ while (remainder > 0) {
+ buffer[offset++] = readLongBE2(input, idx * 2);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 3:
+ while (remainder > 0) {
+ buffer[offset++] = readLongBE3(input, idx * 3);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 4:
+ while (remainder > 0) {
+ buffer[offset++] = readLongBE4(input, idx * 4);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 5:
+ while (remainder > 0) {
+ buffer[offset++] = readLongBE5(input, idx * 5);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 6:
+ while (remainder > 0) {
+ buffer[offset++] = readLongBE6(input, idx * 6);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 7:
+ while (remainder > 0) {
+ buffer[offset++] = readLongBE7(input, idx * 7);
+ remainder--;
+ idx++;
+ }
+ break;
+ case 8:
+ while (remainder > 0) {
+ buffer[offset++] = readLongBE8(input, idx * 8);
+ remainder--;
+ idx++;
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void readLongBE(InStream in, long[] buffer, int start, int numHops, int numBytes)
+ throws IOException {
+ final int toRead = numHops * numBytes;
+ // bulk read to buffer
+ int bytesRead = in.read(readBuffer, 0, toRead);
+ while (bytesRead != toRead) {
+ bytesRead += in.read(readBuffer, bytesRead, toRead - bytesRead);
+ }
+
+ switch (numBytes) {
+ case 1:
+ buffer[start + 0] = readBuffer[0] & 255;
+ buffer[start + 1] = readBuffer[1] & 255;
+ buffer[start + 2] = readBuffer[2] & 255;
+ buffer[start + 3] = readBuffer[3] & 255;
+ buffer[start + 4] = readBuffer[4] & 255;
+ buffer[start + 5] = readBuffer[5] & 255;
+ buffer[start + 6] = readBuffer[6] & 255;
+ buffer[start + 7] = readBuffer[7] & 255;
+ break;
+ case 2:
+ buffer[start + 0] = readLongBE2(in, 0);
+ buffer[start + 1] = readLongBE2(in, 2);
+ buffer[start + 2] = readLongBE2(in, 4);
+ buffer[start + 3] = readLongBE2(in, 6);
+ buffer[start + 4] = readLongBE2(in, 8);
+ buffer[start + 5] = readLongBE2(in, 10);
+ buffer[start + 6] = readLongBE2(in, 12);
+ buffer[start + 7] = readLongBE2(in, 14);
+ break;
+ case 3:
+ buffer[start + 0] = readLongBE3(in, 0);
+ buffer[start + 1] = readLongBE3(in, 3);
+ buffer[start + 2] = readLongBE3(in, 6);
+ buffer[start + 3] = readLongBE3(in, 9);
+ buffer[start + 4] = readLongBE3(in, 12);
+ buffer[start + 5] = readLongBE3(in, 15);
+ buffer[start + 6] = readLongBE3(in, 18);
+ buffer[start + 7] = readLongBE3(in, 21);
+ break;
+ case 4:
+ buffer[start + 0] = readLongBE4(in, 0);
+ buffer[start + 1] = readLongBE4(in, 4);
+ buffer[start + 2] = readLongBE4(in, 8);
+ buffer[start + 3] = readLongBE4(in, 12);
+ buffer[start + 4] = readLongBE4(in, 16);
+ buffer[start + 5] = readLongBE4(in, 20);
+ buffer[start + 6] = readLongBE4(in, 24);
+ buffer[start + 7] = readLongBE4(in, 28);
+ break;
+ case 5:
+ buffer[start + 0] = readLongBE5(in, 0);
+ buffer[start + 1] = readLongBE5(in, 5);
+ buffer[start + 2] = readLongBE5(in, 10);
+ buffer[start + 3] = readLongBE5(in, 15);
+ buffer[start + 4] = readLongBE5(in, 20);
+ buffer[start + 5] = readLongBE5(in, 25);
+ buffer[start + 6] = readLongBE5(in, 30);
+ buffer[start + 7] = readLongBE5(in, 35);
+ break;
+ case 6:
+ buffer[start + 0] = readLongBE6(in, 0);
+ buffer[start + 1] = readLongBE6(in, 6);
+ buffer[start + 2] = readLongBE6(in, 12);
+ buffer[start + 3] = readLongBE6(in, 18);
+ buffer[start + 4] = readLongBE6(in, 24);
+ buffer[start + 5] = readLongBE6(in, 30);
+ buffer[start + 6] = readLongBE6(in, 36);
+ buffer[start + 7] = readLongBE6(in, 42);
+ break;
+ case 7:
+ buffer[start + 0] = readLongBE7(in, 0);
+ buffer[start + 1] = readLongBE7(in, 7);
+ buffer[start + 2] = readLongBE7(in, 14);
+ buffer[start + 3] = readLongBE7(in, 21);
+ buffer[start + 4] = readLongBE7(in, 28);
+ buffer[start + 5] = readLongBE7(in, 35);
+ buffer[start + 6] = readLongBE7(in, 42);
+ buffer[start + 7] = readLongBE7(in, 49);
+ break;
+ case 8:
+ buffer[start + 0] = readLongBE8(in, 0);
+ buffer[start + 1] = readLongBE8(in, 8);
+ buffer[start + 2] = readLongBE8(in, 16);
+ buffer[start + 3] = readLongBE8(in, 24);
+ buffer[start + 4] = readLongBE8(in, 32);
+ buffer[start + 5] = readLongBE8(in, 40);
+ buffer[start + 6] = readLongBE8(in, 48);
+ buffer[start + 7] = readLongBE8(in, 56);
+ break;
+ default:
+ break;
+ }
+ }
+
+ private long readLongBE2(InStream in, int rbOffset) {
+ return (((readBuffer[rbOffset] & 255) << 8)
+ + ((readBuffer[rbOffset + 1] & 255) << 0));
+ }
+
+ private long readLongBE3(InStream in, int rbOffset) {
+ return (((readBuffer[rbOffset] & 255) << 16)
+ + ((readBuffer[rbOffset + 1] & 255) << 8)
+ + ((readBuffer[rbOffset + 2] & 255) << 0));
+ }
+
+ private long readLongBE4(InStream in, int rbOffset) {
+ return (((long) (readBuffer[rbOffset] & 255) << 24)
+ + ((readBuffer[rbOffset + 1] & 255) << 16)
+ + ((readBuffer[rbOffset + 2] & 255) << 8)
+ + ((readBuffer[rbOffset + 3] & 255) << 0));
+ }
+
+ private long readLongBE5(InStream in, int rbOffset) {
+ return (((long) (readBuffer[rbOffset] & 255) << 32)
+ + ((long) (readBuffer[rbOffset + 1] & 255) << 24)
+ + ((readBuffer[rbOffset + 2] & 255) << 16)
+ + ((readBuffer[rbOffset + 3] & 255) << 8)
+ + ((readBuffer[rbOffset + 4] & 255) << 0));
+ }
+
+ private long readLongBE6(InStream in, int rbOffset) {
+ return (((long) (readBuffer[rbOffset] & 255) << 40)
+ + ((long) (readBuffer[rbOffset + 1] & 255) << 32)
+ + ((long) (readBuffer[rbOffset + 2] & 255) << 24)
+ + ((readBuffer[rbOffset + 3] & 255) << 16)
+ + ((readBuffer[rbOffset + 4] & 255) << 8)
+ + ((readBuffer[rbOffset + 5] & 255) << 0));
+ }
+
+ private long readLongBE7(InStream in, int rbOffset) {
+ return (((long) (readBuffer[rbOffset] & 255) << 48)
+ + ((long) (readBuffer[rbOffset + 1] & 255) << 40)
+ + ((long) (readBuffer[rbOffset + 2] & 255) << 32)
+ + ((long) (readBuffer[rbOffset + 3] & 255) << 24)
+ + ((readBuffer[rbOffset + 4] & 255) << 16)
+ + ((readBuffer[rbOffset + 5] & 255) << 8)
+ + ((readBuffer[rbOffset + 6] & 255) << 0));
+ }
+
+ private long readLongBE8(InStream in, int rbOffset) {
+ return (((long) (readBuffer[rbOffset] & 255) << 56)
+ + ((long) (readBuffer[rbOffset + 1] & 255) << 48)
+ + ((long) (readBuffer[rbOffset + 2] & 255) << 40)
+ + ((long) (readBuffer[rbOffset + 3] & 255) << 32)
+ + ((long) (readBuffer[rbOffset + 4] & 255) << 24)
+ + ((readBuffer[rbOffset + 5] & 255) << 16)
+ + ((readBuffer[rbOffset + 6] & 255) << 8)
+ + ((readBuffer[rbOffset + 7] & 255) << 0));
+ }
+
+ // Do not want to use Guava LongMath.checkedSubtract() here as it will throw
+ // ArithmeticException in case of overflow
+ public boolean isSafeSubtract(long left, long right) {
+ return (left ^ right) >= 0 | (left ^ (left - right)) >= 0;
+ }
+}