You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2016/04/21 20:43:11 UTC
[1/2] parquet-mr git commit: PARQUET-225: Add support for INT64 delta
encoding.
Repository: parquet-mr
Updated Branches:
refs/heads/master 741944332 -> 8bcfe6c55
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java b/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java
index 8df5f39..b7dc26b 100644
--- a/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java
+++ b/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java
@@ -22,10 +22,10 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Random;
import org.junit.Assert;
import org.junit.Test;
-
import org.apache.parquet.Log;
import org.apache.parquet.column.values.bitpacking.BitPacking.BitPackingReader;
import org.apache.parquet.column.values.bitpacking.BitPacking.BitPackingWriter;
@@ -46,6 +46,24 @@ public class TestByteBitPacking {
Assert.assertArrayEquals("width "+i, values, unpacked);
}
}
+
+ @Test
+ public void testPackUnPackLong() {
+ LOG.debug("");
+ LOG.debug("testPackUnPackLong");
+ for (int i = 1; i < 64; i++) {
+ LOG.debug("Width: " + i);
+ long[] unpacked32 = new long[32];
+ long[] unpacked8 = new long[32];
+ long[] values = generateValuesLong(i);
+ packUnpack32(Packer.BIG_ENDIAN.newBytePackerForLong(i), values, unpacked32);
+ LOG.debug("Output 32: " + TestBitPacking.toString(unpacked32));
+ Assert.assertArrayEquals("width "+i, values, unpacked32);
+ packUnpack8(Packer.BIG_ENDIAN.newBytePackerForLong(i), values, unpacked8);
+ LOG.debug("Output 8: " + TestBitPacking.toString(unpacked8));
+ Assert.assertArrayEquals("width "+i, values, unpacked8);
+ }
+ }
private void packUnpack(BytePacker packer, int[] values, int[] unpacked) {
byte[] packed = new byte[packer.getBitWidth() * 4];
@@ -54,6 +72,24 @@ public class TestByteBitPacking {
packer.unpack32Values(ByteBuffer.wrap(packed), 0, unpacked, 0);
}
+ private void packUnpack32(BytePackerForLong packer, long[] values, long[] unpacked) {
+ byte[] packed = new byte[packer.getBitWidth() * 4];
+ packer.pack32Values(values, 0, packed, 0);
+ LOG.debug("packed: " + TestBitPacking.toString(packed));
+ packer.unpack32Values(packed, 0, unpacked, 0);
+ }
+
+ private void packUnpack8(BytePackerForLong packer, long[] values, long[] unpacked) {
+ byte[] packed = new byte[packer.getBitWidth() * 4];
+ for (int i = 0; i < 4; i++) {
+ packer.pack8Values(values, 8 * i, packed, packer.getBitWidth() * i);
+ }
+ LOG.debug("packed: " + TestBitPacking.toString(packed));
+ for (int i = 0; i < 4; i++) {
+ packer.unpack8Values(packed, packer.getBitWidth() * i, unpacked, 8 * i);
+ }
+ }
+
private int[] generateValues(int bitWidth) {
int[] values = new int[32];
for (int j = 0; j < values.length; j++) {
@@ -63,6 +99,16 @@ public class TestByteBitPacking {
return values;
}
+ private long[] generateValuesLong(int bitWidth) {
+ long[] values = new long[32];
+ Random random = new Random(0);
+ for (int j = 0; j < values.length; j++) {
+ values[j] = random.nextLong() & ((1l << bitWidth) - 1l);
+ }
+ LOG.debug("Input: " + TestBitPacking.toString(values));
+ return values;
+ }
+
@Test
public void testPackUnPackAgainstHandWritten() throws IOException {
LOG.debug("");
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java
----------------------------------------------------------------------
diff --git a/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java b/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java
index 3d182e2..b4868e9 100644
--- a/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java
+++ b/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java
@@ -27,23 +27,40 @@ import java.io.IOException;
* This class generates bit packers that pack the most significant bit first.
* The result of the generation is checked in. To regenerate the code run this class and check in the result.
*
- * TODO: remove the unnecessary masks for perf
- *
* @author Julien Le Dem
*
*/
public class ByteBasedBitPackingGenerator {
- private static final String CLASS_NAME_PREFIX = "ByteBitPacking";
- private static final int PACKER_COUNT = 32;
+ private static final String CLASS_NAME_PREFIX_FOR_INT = "ByteBitPacking";
+ private static final String CLASS_NAME_PREFIX_FOR_LONG = "ByteBitPackingForLong";
+ private static final String VARIABLE_TYPE_FOR_INT = "int";
+ private static final String VARIABLE_TYPE_FOR_LONG = "long";
+ private static final int MAX_BITS_FOR_INT = 32;
+ private static final int MAX_BITS_FOR_LONG = 64;
public static void main(String[] args) throws Exception {
String basePath = args[0];
- generateScheme(CLASS_NAME_PREFIX + "BE", true, basePath);
- generateScheme(CLASS_NAME_PREFIX + "LE", false, basePath);
+ // Int for Big Endian
+ generateScheme(false, true, basePath);
+
+ // Int for Little Endian
+ generateScheme(false, false, basePath);
+
+ // Long for Big Endian
+ generateScheme(true, true, basePath);
+
+ // Long for Little Endian
+ generateScheme(true, false, basePath);
}
- private static void generateScheme(String className, boolean msbFirst, String basePath) throws IOException {
+ private static void generateScheme(boolean isLong, boolean msbFirst,
+ String basePath) throws IOException {
+ String baseClassName = isLong ? CLASS_NAME_PREFIX_FOR_LONG : CLASS_NAME_PREFIX_FOR_INT;
+ String className = msbFirst ? (baseClassName + "BE") : (baseClassName + "LE");
+ int maxBits = isLong ? MAX_BITS_FOR_LONG : MAX_BITS_FOR_INT;
+ String nameSuffix = isLong ? "ForLong" : "";
+
final File file = new File(basePath + "/org/apache/parquet/column/values/bitpacking/" + className + ".java").getAbsoluteFile();
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
@@ -65,48 +82,58 @@ public class ByteBasedBitPackingGenerator {
fw.append(" */\n");
fw.append("public abstract class " + className + " {\n");
fw.append("\n");
- fw.append(" private static final BytePacker[] packers = new BytePacker[33];\n");
+ fw.append(" private static final BytePacker" + nameSuffix + "[] packers = new BytePacker" + nameSuffix + "[" + (maxBits + 1) + "];\n");
fw.append(" static {\n");
- for (int i = 0; i <= PACKER_COUNT; i++) {
+ for (int i = 0; i <= maxBits; i++) {
fw.append(" packers[" + i + "] = new Packer" + i + "();\n");
}
fw.append(" }\n");
fw.append("\n");
- fw.append(" public static final BytePackerFactory factory = new BytePackerFactory() {\n");
- fw.append(" public BytePacker newBytePacker(int bitWidth) {\n");
+ fw.append(" public static final BytePacker" + nameSuffix + "Factory factory = new BytePacker" + nameSuffix + "Factory() {\n");
+ fw.append(" public BytePacker" + nameSuffix + " newBytePacker" + nameSuffix + "(int bitWidth) {\n");
fw.append(" return packers[bitWidth];\n");
fw.append(" }\n");
fw.append(" };\n");
fw.append("\n");
- for (int i = 0; i <= PACKER_COUNT; i++) {
- generateClass(fw, i, msbFirst);
+ for (int i = 0; i <= maxBits; i++) {
+ generateClass(fw, i, isLong, msbFirst);
fw.append("\n");
}
fw.append("}\n");
fw.close();
}
- private static void generateClass(FileWriter fw, int bitWidth, boolean msbFirst) throws IOException {
- fw.append(" private static final class Packer" + bitWidth + " extends BytePacker {\n");
+ private static void generateClass(FileWriter fw, int bitWidth, boolean isLong, boolean msbFirst) throws IOException {
+ String nameSuffix = isLong ? "ForLong" : "";
+ fw.append(" private static final class Packer" + bitWidth + " extends BytePacker" + nameSuffix + " {\n");
fw.append("\n");
fw.append(" private Packer" + bitWidth + "() {\n");
fw.append(" super("+bitWidth+");\n");
fw.append(" }\n");
fw.append("\n");
// Packing
- generatePack(fw, bitWidth, 1, msbFirst);
- generatePack(fw, bitWidth, 4, msbFirst);
+ generatePack(fw, bitWidth, 1, isLong, msbFirst);
+ generatePack(fw, bitWidth, 4, isLong, msbFirst);
// Unpacking
- generateUnpack(fw, bitWidth, 1, msbFirst, true);
- generateUnpack(fw, bitWidth, 1, msbFirst, false);
- generateUnpack(fw, bitWidth, 4, msbFirst, true);
- generateUnpack(fw, bitWidth, 4, msbFirst, false);
+ generateUnpack(fw, bitWidth, 1, isLong, msbFirst, true);
+ generateUnpack(fw, bitWidth, 1, isLong, msbFirst, false);
+ generateUnpack(fw, bitWidth, 4, isLong, msbFirst, true);
+ generateUnpack(fw, bitWidth, 4, isLong, msbFirst, false);
fw.append(" }\n");
}
-
- private static int getShift(FileWriter fw, int bitWidth, boolean msbFirst,
+
+ private static class ShiftMask {
+ ShiftMask(int shift, long mask) {
+ this.shift = shift;
+ this.mask = mask;
+ }
+ public int shift;
+ public long mask;
+ }
+
+ private static ShiftMask getShift(FileWriter fw, int bitWidth, boolean isLong, boolean msbFirst,
int byteIndex, int valueIndex) throws IOException {
// relative positions of the start and end of the value to the start and end of the byte
int valueStartBitIndex = (valueIndex * bitWidth) - (8 * (byteIndex));
@@ -120,6 +147,7 @@ public class ByteBasedBitPackingGenerator {
int byteEndBitWanted;
int shift;
+ int widthWanted;
if (msbFirst) {
valueStartBitWanted = valueStartBitIndex < 0 ? bitWidth - 1 + valueStartBitIndex : bitWidth - 1;
@@ -127,13 +155,17 @@ public class ByteBasedBitPackingGenerator {
byteStartBitWanted = valueStartBitIndex < 0 ? 8 : 7 - valueStartBitIndex;
byteEndBitWanted = valueEndBitIndex > 0 ? 0 : -valueEndBitIndex;
shift = valueEndBitWanted - byteEndBitWanted;
+ widthWanted = Math.min(7, byteStartBitWanted) - Math.min(7, byteEndBitWanted) + 1;
} else {
valueStartBitWanted = bitWidth - 1 - (valueEndBitIndex > 0 ? valueEndBitIndex : 0);
valueEndBitWanted = bitWidth - 1 - (valueStartBitIndex < 0 ? bitWidth - 1 + valueStartBitIndex : bitWidth - 1);
byteStartBitWanted = 7 - (valueEndBitIndex > 0 ? 0 : -valueEndBitIndex);
byteEndBitWanted = 7 - (valueStartBitIndex < 0 ? 8 : 7 - valueStartBitIndex);
shift = valueStartBitWanted - byteStartBitWanted;
+ widthWanted = Math.max(0, byteStartBitWanted) - Math.max(0, byteEndBitWanted) + 1;
}
+
+ int maskWidth = widthWanted + Math.max(0, shift);
visualizeAlignment(
fw, bitWidth, valueEndBitIndex,
@@ -141,7 +173,7 @@ public class ByteBasedBitPackingGenerator {
byteStartBitWanted, byteEndBitWanted,
shift
);
- return shift;
+ return new ShiftMask(shift, genMask(maskWidth, isLong));
}
private static void visualizeAlignment(FileWriter fw, int bitWidth,
@@ -177,9 +209,11 @@ public class ByteBasedBitPackingGenerator {
fw.append(" ");
}
- private static void generatePack(FileWriter fw, int bitWidth, int batch, boolean msbFirst) throws IOException {
- int mask = genMask(bitWidth);
- fw.append(" public final void pack" + (batch * 8) + "Values(final int[] in, final int inPos, final byte[] out, final int outPos) {\n");
+ private static void generatePack(FileWriter fw, int bitWidth, int batch, boolean isLong, boolean msbFirst) throws IOException {
+ long mask = genMask(bitWidth, isLong);
+ String maskSuffix = isLong ? "L" : "";
+ String variableType = isLong ? VARIABLE_TYPE_FOR_LONG : VARIABLE_TYPE_FOR_INT;
+ fw.append(" public final void pack" + (batch * 8) + "Values(final " + variableType + "[] in, final int inPos, final byte[] out, final int outPos) {\n");
for (int byteIndex = 0; byteIndex < bitWidth * batch; ++byteIndex) {
fw.append(" out[" + align(byteIndex, 2) + " + outPos] = (byte)((\n");
int startIndex = (byteIndex * 8) / bitWidth;
@@ -191,32 +225,31 @@ public class ByteBasedBitPackingGenerator {
} else {
fw.append("\n | ");
}
- int shift = getShift(fw, bitWidth, msbFirst, byteIndex, valueIndex);
+ ShiftMask shiftMask = getShift(fw, bitWidth, isLong, msbFirst, byteIndex, valueIndex);
String shiftString = ""; // used when shift == 0
- if (shift > 0) {
- shiftString = " >>> " + shift;
- } else if (shift < 0) {
- shiftString = " << " + ( - shift);
+ if (shiftMask.shift > 0) {
+ shiftString = " >>> " + shiftMask.shift;
+ } else if (shiftMask.shift < 0) {
+ shiftString = " << " + ( - shiftMask.shift);
}
- fw.append("((in[" + align(valueIndex, 2) + " + inPos] & " + mask + ")" + shiftString + ")");
+ fw.append("((in[" + align(valueIndex, 2) + " + inPos] & " + mask + maskSuffix + ")" + shiftString + ")");
}
fw.append(") & 255);\n");
}
fw.append(" }\n");
}
- private static void generateUnpack(FileWriter fw, int bitWidth, int batch, boolean msbFirst, boolean useByteArray)
+ private static void generateUnpack(FileWriter fw, int bitWidth, int batch, boolean isLong, boolean msbFirst, boolean useByteArray)
throws IOException {
- final String bufferDataType;
- if (useByteArray) {
- bufferDataType = "byte[]";
- } else {
- bufferDataType = "ByteBuffer";
- }
- fw.append(" public final void unpack" + (batch * 8) + "Values(final " + bufferDataType + " in, final int inPos, final int[] out, final int outPos) {\n");
+ final String variableType = isLong ? VARIABLE_TYPE_FOR_LONG : VARIABLE_TYPE_FOR_INT;
+ final String bufferDataType = useByteArray ? "byte[]" : "ByteBuffer";
+
+ fw.append(" public final void unpack" + (batch * 8) + "Values(final " + bufferDataType + " in, "
+ + "final int inPos, final " + variableType + "[] out, final int outPos) {\n");
+
if (bitWidth > 0) {
- int mask = genMask(bitWidth);
+ String maskSuffix = isLong ? "L" : "";
for (int valueIndex = 0; valueIndex < (batch * 8); ++valueIndex) {
fw.append(" out[" + align(valueIndex, 2) + " + outPos] =\n");
@@ -229,14 +262,16 @@ public class ByteBasedBitPackingGenerator {
} else {
fw.append("\n | ");
}
- int shift = getShift(fw, bitWidth, msbFirst, byteIndex, valueIndex);
+
+ ShiftMask shiftMask = getShift(fw, bitWidth, isLong, msbFirst, byteIndex, valueIndex);
String shiftString = ""; // when shift == 0
- if (shift < 0) {
- shiftString = ">>> " + (-shift);
- } else if (shift > 0){
- shiftString = "<< " + shift;
+ if (shiftMask.shift < 0) {
+ shiftString = ">> " + (-shiftMask.shift);
+ } else if (shiftMask.shift > 0){
+ shiftString = "<< " + shiftMask.shift;
}
+
final String byteAccess;
if (useByteArray) {
byteAccess = "in[" + align(byteIndex, 2) + " + inPos]";
@@ -244,7 +279,10 @@ public class ByteBasedBitPackingGenerator {
// use ByteBuffer#get(index) method
byteAccess = "in.get(" + align(byteIndex, 2) + " + inPos)";
}
- fw.append(" (((((int)" + byteAccess + ") & 255) " + shiftString + ") & " + mask + ")");
+
+ // Shift the wanted bits to the least significant position and mask them knowing how many bits to get.
+ fw.append(" ((((" + variableType + ")" + byteAccess + ") " + shiftString +
+ ") & " + shiftMask.mask + maskSuffix + ")");
}
fw.append(";\n");
}
@@ -252,8 +290,14 @@ public class ByteBasedBitPackingGenerator {
fw.append(" }\n");
}
- private static int genMask(int bitWidth) {
- int mask = 0;
+ private static long genMask(int bitWidth, boolean isLong) {
+ int maxBitWidth = isLong ? MAX_BITS_FOR_LONG : MAX_BITS_FOR_INT;
+ if (bitWidth >= maxBitWidth) {
+ // -1 is always ones (11111...1111). It covers all it can possibly can.
+ return -1;
+ }
+
+ long mask = 0;
for (int i = 0; i < bitWidth; i++) {
mask <<= 1;
mask |= 1;
[2/2] parquet-mr git commit: PARQUET-225: Add support for INT64 delta
encoding.
Posted by bl...@apache.org.
PARQUET-225: Add support for INT64 delta encoding.
Author: Vassil Lunchev <va...@leanplum.com>
Closes #154 from lunchev:int64 and squashes the following commits:
84a40fe [Vassil Lunchev] INT64 support for Delta Encoding
4389af4 [Vassil Lunchev] splitting delta INT32 and delta INT64
e5e8fe2 [Vassil Lunchev] split delta encoding tests for INT32 and for INT64
eb4383a [Ryan Blue] PARQUET-225: Avoid multiple small copies in delta int/long encoding.
Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/8bcfe6c5
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/8bcfe6c5
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/8bcfe6c5
Branch: refs/heads/master
Commit: 8bcfe6c55e2588c1047368b4edbf733d1c1d5381
Parents: 7419443
Author: Vassil Lunchev <va...@leanplum.com>
Authored: Tue Mar 24 19:33:03 2015 -0700
Committer: Ryan Blue <bl...@apache.org>
Committed: Thu Apr 21 11:37:51 2016 -0700
----------------------------------------------------------------------
.../org/apache/parquet/column/Encoding.java | 5 +-
.../parquet/column/ParquetProperties.java | 8 +-
.../delta/DeltaBinaryPackingValuesReader.java | 26 +-
.../delta/DeltaBinaryPackingValuesWriter.java | 158 +----------
...eltaBinaryPackingValuesWriterForInteger.java | 199 ++++++++++++++
.../DeltaBinaryPackingValuesWriterForLong.java | 201 ++++++++++++++
.../DeltaLengthByteArrayValuesWriter.java | 3 +-
.../deltastrings/DeltaByteArrayWriter.java | 4 +-
...BinaryPackingValuesWriterForIntegerTest.java | 266 +++++++++++++++++++
...ltaBinaryPackingValuesWriterForLongTest.java | 263 ++++++++++++++++++
.../DeltaBinaryPackingValuesWriterTest.java | 264 ------------------
.../benchmark/BenchmarkIntegerOutputSize.java | 12 +-
.../BenchmarkReadingRandomIntegers.java | 29 +-
.../benchmark/RandomWritingBenchmarkTest.java | 25 +-
.../org/apache/parquet/bytes/BytesUtils.java | 36 +++
.../org/apache/parquet/bytes/BytesInput.java | 39 ++-
.../values/bitpacking/BytePackerForLong.java | 112 ++++++++
.../bitpacking/BytePackerForLongFactory.java | 25 ++
.../column/values/bitpacking/Packer.java | 24 +-
.../values/bitpacking/TestBitPacking.java | 14 +
.../values/bitpacking/TestByteBitPacking.java | 48 +++-
.../ByteBasedBitPackingGenerator.java | 144 ++++++----
22 files changed, 1397 insertions(+), 508 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java b/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java
index 87bc798..0a24e76 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java
@@ -21,6 +21,7 @@ package org.apache.parquet.column;
import static org.apache.parquet.column.values.bitpacking.Packer.BIG_ENDIAN;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
@@ -163,8 +164,8 @@ public enum Encoding {
DELTA_BINARY_PACKED {
@Override
public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
- if(descriptor.getType() != INT32) {
- throw new ParquetDecodingException("Encoding DELTA_BINARY_PACKED is only supported for type INT32");
+ if(descriptor.getType() != INT32 && descriptor.getType() != INT64) {
+ throw new ParquetDecodingException("Encoding DELTA_BINARY_PACKED is only supported for type INT32 and INT64");
}
return new DeltaBinaryPackingValuesReader();
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index 0c07d54..9ed7736 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -32,7 +32,8 @@ import org.apache.parquet.column.impl.ColumnWriteStoreV2;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.boundedint.DevNullValuesWriter;
-import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong;
import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter;
import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter;
import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter;
@@ -212,9 +213,10 @@ public class ParquetProperties {
case FIXED_LEN_BYTE_ARRAY:
return new DeltaByteArrayWriter(initialSlabSize, pageSizeThreshold, allocator);
case INT32:
- return new DeltaBinaryPackingValuesWriter(initialSlabSize, pageSizeThreshold, allocator);
- case INT96:
+ return new DeltaBinaryPackingValuesWriterForInteger(initialSlabSize, pageSizeThreshold, allocator);
case INT64:
+ return new DeltaBinaryPackingValuesWriterForLong(initialSlabSize, pageSizeThreshold, allocator);
+ case INT96:
case DOUBLE:
case FLOAT:
return plainWriter(path);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
index 3f92deb..a3355d2 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
@@ -18,11 +18,13 @@
*/
package org.apache.parquet.column.values.delta;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.BytePackerForLong;
import org.apache.parquet.column.values.bitpacking.Packer;
import org.apache.parquet.io.ParquetDecodingException;
@@ -40,12 +42,12 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
* values read by the caller
*/
private int valuesRead;
- private int minDeltaInCurrentBlock;
+ private long minDeltaInCurrentBlock;
private ByteBuffer page;
/**
* stores the decoded values including the first value which is written to the header
*/
- private int[] valuesBuffer;
+ private long[] valuesBuffer;
/**
* values loaded to the buffer, it could be bigger than the totalValueCount
* when data is not aligned to mini block, which means padding 0s are in the buffer
@@ -74,7 +76,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
bitWidths = new int[config.miniBlockNumInABlock];
//read first value from header
- valuesBuffer[valuesBuffered++] = BytesUtils.readZigZagVarInt(in);
+ valuesBuffer[valuesBuffered++] = BytesUtils.readZigZagVarLong(in);
while (valuesBuffered < totalValueCount) { //values Buffered could be more than totalValueCount, since we flush on a mini block basis
loadNewBlockToBuffer();
@@ -94,7 +96,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
private void allocateValuesBuffer() {
int totalMiniBlockCount = (int) Math.ceil((double) totalValueCount / config.miniBlockSizeInValues);
//+ 1 because first value written to header is also stored in values buffer
- valuesBuffer = new int[totalMiniBlockCount * config.miniBlockSizeInValues + 1];
+ valuesBuffer = new long[totalMiniBlockCount * config.miniBlockSizeInValues + 1];
}
@Override
@@ -105,6 +107,12 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
@Override
public int readInteger() {
+ // TODO: probably implement it separately
+ return (int) readLong();
+ }
+
+ @Override
+ public long readLong() {
checkRead();
return valuesBuffer[valuesRead++];
}
@@ -117,7 +125,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
private void loadNewBlockToBuffer() {
try {
- minDeltaInCurrentBlock = BytesUtils.readZigZagVarInt(in);
+ minDeltaInCurrentBlock = BytesUtils.readZigZagVarLong(in);
} catch (IOException e) {
throw new ParquetDecodingException("can not read min delta in current block", e);
}
@@ -127,7 +135,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
// mini block is atomic for reading, we read a mini block when there are more values left
int i;
for (i = 0; i < config.miniBlockNumInABlock && valuesBuffered < totalValueCount; i++) {
- BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidths[i]);
+ BytePackerForLong packer = Packer.LITTLE_ENDIAN.newBytePackerForLong(bitWidths[i]);
unpackMiniBlock(packer);
}
@@ -144,13 +152,13 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
*
* @param packer the packer created from bitwidth of current mini block
*/
- private void unpackMiniBlock(BytePacker packer) {
+ private void unpackMiniBlock(BytePackerForLong packer) {
for (int j = 0; j < config.miniBlockSizeInValues; j += 8) {
unpack8Values(packer);
}
}
- private void unpack8Values(BytePacker packer) {
+ private void unpack8Values(BytePackerForLong packer) {
//calculate the pos because the packer api uses array not stream
int pos = page.limit() - in.available();
packer.unpack8Values(page, pos, valuesBuffer, valuesBuffered);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java
index 421182f..ac3c594 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java
@@ -24,7 +24,7 @@ import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.values.ValuesWriter;
-import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.BytePackerForLong;
import org.apache.parquet.column.values.bitpacking.Packer;
import org.apache.parquet.io.ParquetEncodingException;
@@ -50,68 +50,40 @@ import java.io.IOException;
*
* @author Tianshuo Deng
*/
-public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
- /**
- * max bitwidth for a mini block, it is used to allocate miniBlockByteBuffer which is
- * reused between flushes.
- */
- public static final int MAX_BITWIDTH = 32;
+public abstract class DeltaBinaryPackingValuesWriter extends ValuesWriter {
public static final int DEFAULT_NUM_BLOCK_VALUES = 128;
public static final int DEFAULT_NUM_MINIBLOCKS = 4;
- private final CapacityByteArrayOutputStream baos;
+ protected final CapacityByteArrayOutputStream baos;
/**
* stores blockSizeInValues, miniBlockNumInABlock and miniBlockSizeInValues
*/
- private final DeltaBinaryPackingConfig config;
+ protected final DeltaBinaryPackingConfig config;
/**
* bit width for each mini block, reused between flushes
*/
- private final int[] bitWidths;
+ protected final int[] bitWidths;
- private int totalValueCount = 0;
+ protected int totalValueCount = 0;
/**
* a pointer to deltaBlockBuffer indicating the end of deltaBlockBuffer
* the number of values in the deltaBlockBuffer that haven't flushed to baos
* it will be reset after each flush
*/
- private int deltaValuesToFlush = 0;
-
- /**
- * stores delta values starting from the 2nd value written(1st value is stored in header).
- * It's reused between flushes
- */
- private int[] deltaBlockBuffer;
+ protected int deltaValuesToFlush = 0;
/**
* bytes buffer for a mini block, it is reused for each mini block.
* Therefore the size of biggest miniblock with bitwith of MAX_BITWITH is allocated
*/
- private byte[] miniBlockByteBuffer;
-
- /**
- * firstValue is written to the header of the page
- */
- private int firstValue = 0;
-
- /**
- * cache previous written value for calculating delta
- */
- private int previousValue = 0;
-
- /**
- * min delta is written to the beginning of each block.
- * it's zig-zag encoded. The deltas stored in each block is actually the difference to min delta,
- * therefore are all positive
- * it will be reset after each flush
- */
- private int minDeltaInCurrentBlock = Integer.MAX_VALUE;
+ protected byte[] miniBlockByteBuffer;
+// TODO: remove this.
public DeltaBinaryPackingValuesWriter(int slabSize, int pageSize, ByteBufferAllocator allocator) {
this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize, pageSize, allocator);
}
@@ -119,8 +91,6 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
public DeltaBinaryPackingValuesWriter(int blockSizeInValues, int miniBlockNum, int slabSize, int pageSize, ByteBufferAllocator allocator) {
this.config = new DeltaBinaryPackingConfig(blockSizeInValues, miniBlockNum);
bitWidths = new int[config.miniBlockNumInABlock];
- deltaBlockBuffer = new int[blockSizeInValues];
- miniBlockByteBuffer = new byte[config.miniBlockSizeInValues * MAX_BITWIDTH];
baos = new CapacityByteArrayOutputStream(slabSize, pageSize, allocator);
}
@@ -129,64 +99,7 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
return baos.size();
}
- @Override
- public void writeInteger(int v) {
- totalValueCount++;
-
- if (totalValueCount == 1) {
- firstValue = v;
- previousValue = firstValue;
- return;
- }
-
- int delta = v - previousValue;//calculate delta
- previousValue = v;
-
- deltaBlockBuffer[deltaValuesToFlush++] = delta;
-
- if (delta < minDeltaInCurrentBlock) {
- minDeltaInCurrentBlock = delta;
- }
-
- if (config.blockSizeInValues == deltaValuesToFlush) {
- flushBlockBuffer();
- }
- }
-
- private void flushBlockBuffer() {
- //since we store the min delta, the deltas will be converted to be the difference to min delta and all positive
- for (int i = 0; i < deltaValuesToFlush; i++) {
- deltaBlockBuffer[i] = deltaBlockBuffer[i] - minDeltaInCurrentBlock;
- }
-
- writeMinDelta();
- int miniBlocksToFlush = getMiniBlockCountToFlush(deltaValuesToFlush);
-
- calculateBitWidthsForDeltaBlockBuffer(miniBlocksToFlush);
- for (int i = 0; i < config.miniBlockNumInABlock; i++) {
- writeBitWidthForMiniBlock(i);
- }
-
- for (int i = 0; i < miniBlocksToFlush; i++) {
- //writing i th miniblock
- int currentBitWidth = bitWidths[i];
- BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(currentBitWidth);
- int miniBlockStart = i * config.miniBlockSizeInValues;
- for (int j = miniBlockStart; j < (i + 1) * config.miniBlockSizeInValues; j += 8) {//8 values per pack
- // mini block is atomic in terms of flushing
- // This may write more values when reach to the end of data writing to last mini block,
- // since it may not be aligend to miniblock,
- // but doesnt matter. The reader uses total count to see if reached the end.
- packer.pack8Values(deltaBlockBuffer, j, miniBlockByteBuffer, 0);
- baos.write(miniBlockByteBuffer, 0, currentBitWidth);
- }
- }
-
- minDeltaInCurrentBlock = Integer.MAX_VALUE;
- deltaValuesToFlush = 0;
- }
-
- private void writeBitWidthForMiniBlock(int i) {
+ protected void writeBitWidthForMiniBlock(int i) {
try {
BytesUtils.writeIntLittleEndianOnOneByte(baos, bitWidths[i]);
} catch (IOException e) {
@@ -194,57 +107,10 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
}
}
- private void writeMinDelta() {
- try {
- BytesUtils.writeZigZagVarInt(minDeltaInCurrentBlock, baos);
- } catch (IOException e) {
- throw new ParquetEncodingException("can not write min delta for block", e);
- }
- }
-
- /**
- * iterate through values in each mini block and calculate the bitWidths of max values.
- *
- * @param miniBlocksToFlush
- */
- private void calculateBitWidthsForDeltaBlockBuffer(int miniBlocksToFlush) {
- for (int miniBlockIndex = 0; miniBlockIndex < miniBlocksToFlush; miniBlockIndex++) {
-
- int mask = 0;
- int miniStart = miniBlockIndex * config.miniBlockSizeInValues;
-
- //The end of current mini block could be the end of current block(deltaValuesToFlush) buffer when data is not aligned to mini block
- int miniEnd = Math.min((miniBlockIndex + 1) * config.miniBlockSizeInValues, deltaValuesToFlush);
-
- for (int i = miniStart; i < miniEnd; i++) {
- mask |= deltaBlockBuffer[i];
- }
- bitWidths[miniBlockIndex] = 32 - Integer.numberOfLeadingZeros(mask);
- }
- }
-
- private int getMiniBlockCountToFlush(double numberCount) {
+ protected int getMiniBlockCountToFlush(double numberCount) {
return (int) Math.ceil(numberCount / config.miniBlockSizeInValues);
}
- /**
- * getBytes will trigger flushing block buffer, DO NOT write after getBytes() is called without calling reset()
- *
- * @return
- */
- @Override
- public BytesInput getBytes() {
- //The Page Header should include: blockSizeInValues, numberOfMiniBlocks, totalValueCount
- if (deltaValuesToFlush != 0) {
- flushBlockBuffer();
- }
- return BytesInput.concat(
- config.toBytesInput(),
- BytesInput.fromUnsignedVarInt(totalValueCount),
- BytesInput.fromZigZagVarInt(firstValue),
- BytesInput.from(baos));
- }
-
@Override
public Encoding getEncoding() {
return Encoding.DELTA_BINARY_PACKED;
@@ -255,7 +121,6 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
this.totalValueCount = 0;
this.baos.reset();
this.deltaValuesToFlush = 0;
- this.minDeltaInCurrentBlock = Integer.MAX_VALUE;
}
@Override
@@ -263,7 +128,6 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
this.totalValueCount = 0;
this.baos.close();
this.deltaValuesToFlush = 0;
- this.minDeltaInCurrentBlock = Integer.MAX_VALUE;
}
@Override
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForInteger.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForInteger.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForInteger.java
new file mode 100644
index 0000000..f2d0acc
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForInteger.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.delta;
+
+import java.io.IOException;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.io.ParquetEncodingException;
+
+/**
+ * Write integers (INT32) with delta encoding and binary packing.
+ *
+ * @author Vassil Lunchev
+ */
+public class DeltaBinaryPackingValuesWriterForInteger extends DeltaBinaryPackingValuesWriter {
+ /**
+ * max bitwidth for a mini block, it is used to allocate miniBlockByteBuffer which is
+ * reused between flushes.
+ */
+ private static final int MAX_BITWIDTH = 32;
+
+ /**
+ * stores delta values starting from the 2nd value written(1st value is stored in header).
+ * It's reused between flushes
+ */
+ private int[] deltaBlockBuffer;
+
+ /**
+ * firstValue is written to the header of the page
+ */
+ private int firstValue = 0;
+
+ /**
+ * cache previous written value for calculating delta
+ */
+ private int previousValue = 0;
+
+ /**
+ * min delta is written to the beginning of each block.
+ * it's zig-zag encoded. The deltas stored in each block is actually the difference to min delta,
+ * therefore are all positive
+ * it will be reset after each flush
+ */
+ private int minDeltaInCurrentBlock = Integer.MAX_VALUE;
+
+ public DeltaBinaryPackingValuesWriterForInteger(
+ int slabSize, int pageSize, ByteBufferAllocator allocator) {
+ this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize, pageSize, allocator);
+ }
+
+ public DeltaBinaryPackingValuesWriterForInteger(int blockSizeInValues, int miniBlockNum,
+ int slabSize, int pageSize, ByteBufferAllocator allocator) {
+ super(blockSizeInValues, miniBlockNum, slabSize, pageSize, allocator);
+ deltaBlockBuffer = new int[config.blockSizeInValues];
+ miniBlockByteBuffer = new byte[config.miniBlockSizeInValues * MAX_BITWIDTH];
+ }
+
+ @Override
+ public void writeInteger(int v) {
+ totalValueCount++;
+
+ if (totalValueCount == 1) {
+ firstValue = v;
+ previousValue = firstValue;
+ return;
+ }
+
+ // Calculate delta. The possible overflow is accounted for. The algorithm is correct because
+ // Java int is working as a modalar ring with base 2^32 and because of the plus and minus
+ // properties of a ring. http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
+ int delta = v - previousValue;
+ previousValue = v;
+
+ deltaBlockBuffer[deltaValuesToFlush++] = delta;
+
+ if (delta < minDeltaInCurrentBlock) {
+ minDeltaInCurrentBlock = delta;
+ }
+
+ if (config.blockSizeInValues == deltaValuesToFlush) {
+ flushBlockBuffer();
+ }
+ }
+
+ private void flushBlockBuffer() {
+ // since we store the min delta, the deltas will be converted to be the difference to min delta
+ // and all positive
+ for (int i = 0; i < deltaValuesToFlush; i++) {
+ deltaBlockBuffer[i] = deltaBlockBuffer[i] - minDeltaInCurrentBlock;
+ }
+
+ writeMinDelta();
+ int miniBlocksToFlush = getMiniBlockCountToFlush(deltaValuesToFlush);
+
+ calculateBitWidthsForDeltaBlockBuffer(miniBlocksToFlush);
+ for (int i = 0; i < config.miniBlockNumInABlock; i++) {
+ writeBitWidthForMiniBlock(i);
+ }
+
+ for (int i = 0; i < miniBlocksToFlush; i++) {
+ // writing i th miniblock
+ int currentBitWidth = bitWidths[i];
+ int blockOffset = 0;
+ BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(currentBitWidth);
+ int miniBlockStart = i * config.miniBlockSizeInValues;
+ for (int j = miniBlockStart; j < (i + 1) * config.miniBlockSizeInValues; j += 8) {//8 values per pack
+ // mini block is atomic in terms of flushing
+ // This may write more values when reach to the end of data writing to last mini block,
+ // since it may not be aligned to miniblock,
+ // but doesn't matter. The reader uses total count to see if reached the end.
+ packer.pack8Values(deltaBlockBuffer, j, miniBlockByteBuffer, blockOffset);
+ blockOffset += currentBitWidth;
+ }
+ baos.write(miniBlockByteBuffer, 0, blockOffset);
+ }
+
+ minDeltaInCurrentBlock = Integer.MAX_VALUE;
+ deltaValuesToFlush = 0;
+ }
+
+ private void writeMinDelta() {
+ try {
+ BytesUtils.writeZigZagVarInt(minDeltaInCurrentBlock, baos);
+ } catch (IOException e) {
+ throw new ParquetEncodingException("can not write min delta for block", e);
+ }
+ }
+
+ /**
+ * iterate through values in each mini block and calculate the bitWidths of max values.
+ *
+ * @param miniBlocksToFlush
+ */
+ private void calculateBitWidthsForDeltaBlockBuffer(int miniBlocksToFlush) {
+ for (int miniBlockIndex = 0; miniBlockIndex < miniBlocksToFlush; miniBlockIndex++) {
+ int mask = 0;
+ int miniStart = miniBlockIndex * config.miniBlockSizeInValues;
+
+ //The end of current mini block could be the end of current block(deltaValuesToFlush) buffer when data is not aligned to mini block
+ int miniEnd = Math.min((miniBlockIndex + 1) * config.miniBlockSizeInValues, deltaValuesToFlush);
+
+ for (int i = miniStart; i < miniEnd; i++) {
+ mask |= deltaBlockBuffer[i];
+ }
+ bitWidths[miniBlockIndex] = 32 - Integer.numberOfLeadingZeros(mask);
+ }
+ }
+
+ /**
+ * getBytes will trigger flushing block buffer, DO NOT write after getBytes() is called without calling reset()
+ *
+ * @return
+ */
+ @Override
+ public BytesInput getBytes() {
+ // The Page Header should include: blockSizeInValues, numberOfMiniBlocks, totalValueCount
+ if (deltaValuesToFlush != 0) {
+ flushBlockBuffer();
+ }
+ return BytesInput.concat(
+ config.toBytesInput(),
+ BytesInput.fromUnsignedVarInt(totalValueCount),
+ BytesInput.fromZigZagVarInt(firstValue),
+ BytesInput.from(baos));
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ this.minDeltaInCurrentBlock = Integer.MAX_VALUE;
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ this.minDeltaInCurrentBlock = Integer.MAX_VALUE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLong.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLong.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLong.java
new file mode 100644
index 0000000..30eecef
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLong.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.delta;
+
+import java.io.IOException;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.bitpacking.BytePackerForLong;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.io.ParquetEncodingException;
+
+/**
+ * Write longs (INT64) with delta encoding and binary packing.
+ *
+ * @author Vassil Lunchev
+ */
+public class DeltaBinaryPackingValuesWriterForLong extends DeltaBinaryPackingValuesWriter {
+ /**
+ * max bitwidth for a mini block, it is used to allocate miniBlockByteBuffer which is
+ * reused between flushes.
+ */
+ private static final int MAX_BITWIDTH = 64;
+
+ /**
+ * stores delta values starting from the 2nd value written(1st value is stored in header).
+ * It's reused between flushes
+ */
+ private long[] deltaBlockBuffer;
+
+ /**
+ * firstValue is written to the header of the page
+ */
+ private long firstValue = 0;
+
+ /**
+ * cache previous written value for calculating delta
+ */
+ private long previousValue = 0;
+
+ /**
+ * min delta is written to the beginning of each block.
+ * it's zig-zag encoded. The deltas stored in each block is actually the difference to min delta,
+ * therefore are all positive
+ * it will be reset after each flush
+ */
+ private long minDeltaInCurrentBlock = Long.MAX_VALUE;
+
+ public DeltaBinaryPackingValuesWriterForLong(
+ int slabSize, int pageSize, ByteBufferAllocator allocator) {
+ this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize, pageSize, allocator);
+ }
+
+ public DeltaBinaryPackingValuesWriterForLong(int blockSizeInValues, int miniBlockNum,
+ int slabSize, int pageSize, ByteBufferAllocator allocator) {
+ super(blockSizeInValues, miniBlockNum, slabSize, pageSize, allocator);
+ deltaBlockBuffer = new long[config.blockSizeInValues];
+ miniBlockByteBuffer = new byte[config.miniBlockSizeInValues * MAX_BITWIDTH];
+ }
+
+ @Override
+ public void writeLong(long v) {
+ totalValueCount++;
+
+ if (totalValueCount == 1) {
+ firstValue = v;
+ previousValue = firstValue;
+ return;
+ }
+
+ // Calculate delta. The possible overflow is accounted for. The algorithm is correct because
+ // Java long is working as a modalar ring with base 2^64 and because of the plus and minus
+ // properties of a ring. http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
+ long delta = v - previousValue;
+ previousValue = v;
+
+ deltaBlockBuffer[deltaValuesToFlush++] = delta;
+
+ if (delta < minDeltaInCurrentBlock) {
+ minDeltaInCurrentBlock = delta;
+ }
+
+ if (config.blockSizeInValues == deltaValuesToFlush) {
+ flushBlockBuffer();
+ }
+ }
+
+ private void flushBlockBuffer() {
+ // since we store the min delta, the deltas will be converted to be the difference to min delta
+ // and all positive
+ for (int i = 0; i < deltaValuesToFlush; i++) {
+ deltaBlockBuffer[i] = deltaBlockBuffer[i] - minDeltaInCurrentBlock;
+ }
+
+ writeMinDelta();
+ int miniBlocksToFlush = getMiniBlockCountToFlush(deltaValuesToFlush);
+
+ calculateBitWidthsForDeltaBlockBuffer(miniBlocksToFlush);
+ for (int i = 0; i < config.miniBlockNumInABlock; i++) {
+ writeBitWidthForMiniBlock(i);
+ }
+
+ for (int i = 0; i < miniBlocksToFlush; i++) {
+ // writing i th miniblock
+ int currentBitWidth = bitWidths[i];
+ int blockOffset = 0;
+ // TODO: should this cache the packer?
+ BytePackerForLong packer = Packer.LITTLE_ENDIAN.newBytePackerForLong(currentBitWidth);
+ int miniBlockStart = i * config.miniBlockSizeInValues;
+ // pack values into the miniblock buffer, 8 at a time to get exactly currentBitWidth bytes
+ for (int j = miniBlockStart; j < (i + 1) * config.miniBlockSizeInValues; j += 8) {
+ // mini block is atomic in terms of flushing
+ // This may write more values when reach to the end of data writing to last mini block,
+ // since it may not be aligned to miniblock,
+ // but doesn't matter. The reader uses total count to see if reached the end.
+ packer.pack8Values(deltaBlockBuffer, j, miniBlockByteBuffer, blockOffset);
+ blockOffset += currentBitWidth;
+ }
+ baos.write(miniBlockByteBuffer, 0, blockOffset);
+ }
+
+ minDeltaInCurrentBlock = Long.MAX_VALUE;
+ deltaValuesToFlush = 0;
+ }
+
+ private void writeMinDelta() {
+ try {
+ BytesUtils.writeZigZagVarLong(minDeltaInCurrentBlock, baos);
+ } catch (IOException e) {
+ throw new ParquetEncodingException("can not write min delta for block", e);
+ }
+ }
+
+ /**
+ * iterate through values in each mini block and calculate the bitWidths of max values.
+ *
+ * @param miniBlocksToFlush
+ */
+ private void calculateBitWidthsForDeltaBlockBuffer(int miniBlocksToFlush) {
+ for (int miniBlockIndex = 0; miniBlockIndex < miniBlocksToFlush; miniBlockIndex++) {
+ long mask = 0;
+ int miniStart = miniBlockIndex * config.miniBlockSizeInValues;
+
+ //The end of current mini block could be the end of current block(deltaValuesToFlush) buffer when data is not aligned to mini block
+ int miniEnd = Math.min((miniBlockIndex + 1) * config.miniBlockSizeInValues, deltaValuesToFlush);
+
+ for (int i = miniStart; i < miniEnd; i++) {
+ mask |= deltaBlockBuffer[i];
+ }
+ bitWidths[miniBlockIndex] = 64 - Long.numberOfLeadingZeros(mask);
+ }
+ }
+
+ /**
+ * getBytes will trigger flushing block buffer, DO NOT write after getBytes() is called without calling reset()
+ *
+ * @return
+ */
+ @Override
+ public BytesInput getBytes() {
+ // The Page Header should include: blockSizeInValues, numberOfMiniBlocks, totalValueCount
+ if (deltaValuesToFlush != 0) {
+ flushBlockBuffer();
+ }
+ return BytesInput.concat(
+ config.toBytesInput(),
+ BytesInput.fromUnsignedVarInt(totalValueCount),
+ BytesInput.fromZigZagVarLong(firstValue),
+ BytesInput.from(baos));
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ this.minDeltaInCurrentBlock = Long.MAX_VALUE;
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ this.minDeltaInCurrentBlock = Long.MAX_VALUE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
index 2d6b213..f7ad912 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
@@ -28,6 +28,7 @@ import org.apache.parquet.bytes.LittleEndianDataOutputStream;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
import org.apache.parquet.io.ParquetEncodingException;
import org.apache.parquet.io.api.Binary;
@@ -52,7 +53,7 @@ public class DeltaLengthByteArrayValuesWriter extends ValuesWriter {
public DeltaLengthByteArrayValuesWriter(int initialSize, int pageSize, ByteBufferAllocator allocator) {
arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, allocator);
out = new LittleEndianDataOutputStream(arrayOut);
- lengthWriter = new DeltaBinaryPackingValuesWriter(
+ lengthWriter = new DeltaBinaryPackingValuesWriterForInteger(
DeltaBinaryPackingValuesWriter.DEFAULT_NUM_BLOCK_VALUES,
DeltaBinaryPackingValuesWriter.DEFAULT_NUM_MINIBLOCKS,
initialSize, pageSize, allocator);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
index 1604ddb..fb6cc9b 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
@@ -23,6 +23,7 @@ import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesWriter;
import org.apache.parquet.io.api.Binary;
@@ -43,7 +44,8 @@ public class DeltaByteArrayWriter extends ValuesWriter{
private byte[] previous;
public DeltaByteArrayWriter(int initialCapacity, int pageSize, ByteBufferAllocator allocator) {
- this.prefixLengthWriter = new DeltaBinaryPackingValuesWriter(128, 4, initialCapacity, pageSize, allocator);
+ this.prefixLengthWriter =
+ new DeltaBinaryPackingValuesWriterForInteger(128, 4, initialCapacity, pageSize, allocator);
this.suffixWriter = new DeltaLengthByteArrayValuesWriter(initialCapacity, pageSize, allocator);
this.previous = new byte[0];
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java
new file mode 100644
index 0000000..a3bec4a
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.delta;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.io.ParquetDecodingException;
+
+public class DeltaBinaryPackingValuesWriterForIntegerTest {
+ DeltaBinaryPackingValuesReader reader;
+ private int blockSize;
+ private int miniBlockNum;
+ private ValuesWriter writer;
+ private Random random;
+
+ @Before
+ public void setUp() {
+ blockSize = 128;
+ miniBlockNum = 4;
+ writer = new DeltaBinaryPackingValuesWriterForInteger(
+ blockSize, miniBlockNum, 100, 200, new DirectByteBufferAllocator());
+ random = new Random(0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void miniBlockSizeShouldBeMultipleOf8() {
+ new DeltaBinaryPackingValuesWriterForInteger(
+ 1281, 4, 100, 100, new DirectByteBufferAllocator());
+ }
+
+ /* When data size is multiple of Block*/
+ @Test
+ public void shouldWriteWhenDataIsAlignedWithBlock() throws IOException {
+ int[] data = new int[5 * blockSize];
+ for (int i = 0; i < blockSize * 5; i++) {
+ data[i] = random.nextInt();
+ }
+ shouldWriteAndRead(data);
+ }
+
+ @Test
+ public void shouldWriteAndReadWhenBlockIsNotFullyWritten() throws IOException {
+ int[] data = new int[blockSize - 3];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = random.nextInt();
+ }
+ shouldWriteAndRead(data);
+ }
+
+ @Test
+ public void shouldWriteAndReadWhenAMiniBlockIsNotFullyWritten() throws IOException {
+ int miniBlockSize = blockSize / miniBlockNum;
+ int[] data = new int[miniBlockSize - 3];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = random.nextInt();
+ }
+ shouldWriteAndRead(data);
+ }
+
+ @Test
+ public void shouldWriteNegativeDeltas() throws IOException {
+ int[] data = new int[blockSize];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = 10 - (i * 32 - random.nextInt(6));
+ }
+ shouldWriteAndRead(data);
+ }
+
+ @Test
+ public void shouldWriteAndReadWhenDeltasAreSame() throws IOException {
+ int[] data = new int[2 * blockSize];
+ for (int i = 0; i < blockSize; i++) {
+ data[i] = i * 32;
+ }
+ shouldWriteAndRead(data);
+ }
+
+ @Test
+ public void shouldWriteAndReadWhenValuesAreSame() throws IOException {
+ int[] data = new int[2 * blockSize];
+ for (int i = 0; i < blockSize; i++) {
+ data[i] = 3;
+ }
+ shouldWriteAndRead(data);
+ }
+
+ @Test
+ public void shouldWriteWhenDeltaIs0ForEachBlock() throws IOException {
+ int[] data = new int[5 * blockSize + 1];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = (i - 1) / blockSize;
+ }
+ shouldWriteAndRead(data);
+ }
+
+ @Test
+ public void shouldReadWriteWhenDataIsNotAlignedWithBlock() throws IOException {
+ int[] data = new int[5 * blockSize + 3];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = random.nextInt(20) - 10;
+ }
+ shouldWriteAndRead(data);
+ }
+
+ @Test
+ public void shouldReadMaxMinValue() throws IOException {
+ int[] data = new int[10];
+ for (int i = 0; i < data.length; i++) {
+ if (i % 2 == 0) {
+ data[i] = Integer.MIN_VALUE;
+ } else {
+ data[i] = Integer.MAX_VALUE;
+ }
+ }
+ shouldWriteAndRead(data);
+ }
+
+ @Test
+ public void shouldReturnCorrectOffsetAfterInitialization() throws IOException {
+ int[] data = new int[2 * blockSize + 3];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = i * 32;
+ }
+ writeData(data);
+
+ reader = new DeltaBinaryPackingValuesReader();
+ BytesInput bytes = writer.getBytes();
+ byte[] valueContent = bytes.toByteArray();
+ byte[] pageContent = new byte[valueContent.length * 10];
+ int contentOffsetInPage = 33;
+ System.arraycopy(valueContent, 0, pageContent, contentOffsetInPage, valueContent.length);
+
+ //offset should be correct
+ reader.initFromPage(100, ByteBuffer.wrap(pageContent), contentOffsetInPage);
+ int offset= reader.getNextOffset();
+ assertEquals(valueContent.length + contentOffsetInPage, offset);
+
+ //should be able to read data correclty
+ for (int i : data) {
+ assertEquals(i, reader.readInteger());
+ }
+ }
+
+ @Test
+ public void shouldThrowExceptionWhenReadMoreThanWritten() throws IOException {
+ int[] data = new int[5 * blockSize + 1];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = i * 32;
+ }
+ shouldWriteAndRead(data);
+ try {
+ reader.readInteger();
+ } catch (ParquetDecodingException e) {
+ assertEquals("no more value to read, total value count is " + data.length, e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void shouldSkip() throws IOException {
+ int[] data = new int[5 * blockSize + 1];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = i * 32;
+ }
+ writeData(data);
+ reader = new DeltaBinaryPackingValuesReader();
+ reader.initFromPage(100, writer.getBytes().toByteBuffer(), 0);
+ for (int i = 0; i < data.length; i++) {
+ if (i % 3 == 0) {
+ reader.skip();
+ } else {
+ assertEquals(i * 32, reader.readInteger());
+ }
+ }
+ }
+
+ @Test
+ public void shouldReset() throws IOException {
+ shouldReadWriteWhenDataIsNotAlignedWithBlock();
+ int[] data = new int[5 * blockSize];
+ for (int i = 0; i < blockSize * 5; i++) {
+ data[i] = i * 2;
+ }
+ writer.reset();
+ shouldWriteAndRead(data);
+ }
+
+ @Test
+ public void randomDataTest() throws IOException {
+ int maxSize = 1000;
+ int[] data = new int[maxSize];
+
+ for (int round = 0; round < 100000; round++) {
+
+
+ int size = random.nextInt(maxSize);
+
+ for (int i = 0; i < size; i++) {
+ data[i] = random.nextInt();
+ }
+ shouldReadAndWrite(data, size);
+ writer.reset();
+ }
+ }
+
+ private void shouldWriteAndRead(int[] data) throws IOException {
+ shouldReadAndWrite(data, data.length);
+ }
+
+ private void shouldReadAndWrite(int[] data, int length) throws IOException {
+ writeData(data, length);
+ reader = new DeltaBinaryPackingValuesReader();
+ byte[] page = writer.getBytes().toByteArray();
+ int miniBlockSize = blockSize / miniBlockNum;
+
+ double miniBlockFlushed = Math.ceil(((double) length - 1) / miniBlockSize);
+ double blockFlushed = Math.ceil(((double) length - 1) / blockSize);
+ double estimatedSize = 4 * 5 //blockHeader
+ + 4 * miniBlockFlushed * miniBlockSize //data(aligned to miniBlock)
+ + blockFlushed * miniBlockNum //bitWidth of mini blocks
+ + (5.0 * blockFlushed);//min delta for each block
+ assertTrue(estimatedSize >= page.length);
+ reader.initFromPage(100, ByteBuffer.wrap(page), 0);
+
+ for (int i = 0; i < length; i++) {
+ assertEquals(data[i], reader.readInteger());
+ }
+ }
+
+ private void writeData(int[] data) {
+ writeData(data, data.length);
+ }
+
+ private void writeData(int[] data, int length) {
+ for (int i = 0; i < length; i++) {
+ writer.writeInteger(data[i]);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java
new file mode 100644
index 0000000..34e1800
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.delta;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.io.ParquetDecodingException;
+
+public class DeltaBinaryPackingValuesWriterForLongTest {
+ DeltaBinaryPackingValuesReader reader;
+ private int blockSize;
+ private int miniBlockNum;
+ private ValuesWriter writer;
+ private Random random;
+
+ @Before
+ public void setUp() {
+ blockSize = 128;
+ miniBlockNum = 4;
+ writer = new DeltaBinaryPackingValuesWriterForLong(
+ blockSize, miniBlockNum, 100, 200, new DirectByteBufferAllocator());
+ random = new Random(0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void miniBlockSizeShouldBeMultipleOf8() {
+ new DeltaBinaryPackingValuesWriterForLong(
+ 1281, 4, 100, 100, new DirectByteBufferAllocator());
+ }
+
+ /* When data size is multiple of Block */
+ @Test
+ public void shouldWriteWhenDataIsAlignedWithBlock() throws IOException {
+ long[] data = new long[5 * blockSize];
+ for (int i = 0; i < blockSize * 5; i++) {
+ data[i] = random.nextLong();
+ }
+ shouldWriteAndRead(data);
+ }
+
+ @Test
+ public void shouldWriteAndReadWhenBlockIsNotFullyWritten() throws IOException {
+ long[] data = new long[blockSize - 3];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = random.nextLong();
+ }
+ shouldWriteAndRead(data);
+ }
+
+ @Test
+ public void shouldWriteAndReadWhenAMiniBlockIsNotFullyWritten() throws IOException {
+ int miniBlockSize = blockSize / miniBlockNum;
+ long[] data = new long[miniBlockSize - 3];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = random.nextLong();
+ }
+ shouldWriteAndRead(data);
+ }
+
+ @Test
+ public void shouldWriteNegativeDeltas() throws IOException {
+ long[] data = new long[blockSize];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = 10 - (i * 32 - random.nextInt(6));
+ }
+ shouldWriteAndRead(data);
+ }
+
+ @Test
+ public void shouldWriteAndReadWhenDeltasAreSame() throws IOException {
+ long[] data = new long[2 * blockSize];
+ for (int i = 0; i < blockSize; i++) {
+ data[i] = i * 32;
+ }
+ shouldWriteAndRead(data);
+ }
+
+ @Test
+ public void shouldWriteAndReadWhenValuesAreSame() throws IOException {
+ long[] data = new long[2 * blockSize];
+ for (int i = 0; i < blockSize; i++) {
+ data[i] = 3;
+ }
+ shouldWriteAndRead(data);
+ }
+
+ @Test
+ public void shouldWriteWhenDeltaIs0ForEachBlock() throws IOException {
+ long[] data = new long[5 * blockSize + 1];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = (i - 1) / blockSize;
+ }
+ shouldWriteAndRead(data);
+ }
+
+ @Test
+ public void shouldReadWriteWhenDataIsNotAlignedWithBlock() throws IOException {
+ long[] data = new long[5 * blockSize + 3];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = random.nextInt(20) - 10;
+ }
+ shouldWriteAndRead(data);
+ }
+
+ @Test
+ public void shouldReadMaxMinValue() throws IOException {
+ long[] data = new long[10];
+ for (int i = 0; i < data.length; i++) {
+ if (i % 2 == 0) {
+ data[i] = Long.MIN_VALUE;
+ } else {
+ data[i] = Long.MAX_VALUE;
+ }
+ }
+ shouldWriteAndRead(data);
+ }
+
+ @Test
+ public void shouldReturnCorrectOffsetAfterInitialization() throws IOException {
+ long[] data = new long[2 * blockSize + 3];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = i * 32;
+ }
+ writeData(data);
+
+ reader = new DeltaBinaryPackingValuesReader();
+ BytesInput bytes = writer.getBytes();
+ byte[] valueContent = bytes.toByteArray();
+ byte[] pageContent = new byte[valueContent.length * 10];
+ int contentOffsetInPage = 33;
+ System.arraycopy(valueContent, 0, pageContent, contentOffsetInPage, valueContent.length);
+
+ //offset should be correct
+ reader.initFromPage(100, ByteBuffer.wrap(pageContent), contentOffsetInPage);
+ int offset = reader.getNextOffset();
+ assertEquals(valueContent.length + contentOffsetInPage, offset);
+
+ //should be able to read data correclty
+ for (long i : data) {
+ assertEquals(i, reader.readLong());
+ }
+ }
+
+ @Test
+ public void shouldThrowExceptionWhenReadMoreThanWritten() throws IOException {
+ long[] data = new long[5 * blockSize + 1];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = i * 32;
+ }
+ shouldWriteAndRead(data);
+ try {
+ reader.readLong();
+ } catch (ParquetDecodingException e) {
+ assertEquals("no more value to read, total value count is " + data.length, e.getMessage());
+ }
+ }
+
+ @Test
+ public void shouldSkip() throws IOException {
+ long[] data = new long[5 * blockSize + 1];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = i * 32;
+ }
+ writeData(data);
+ reader = new DeltaBinaryPackingValuesReader();
+ reader.initFromPage(100, writer.getBytes().toByteBuffer(), 0);
+ for (int i = 0; i < data.length; i++) {
+ if (i % 3 == 0) {
+ reader.skip();
+ } else {
+ assertEquals(i * 32, reader.readLong());
+ }
+ }
+ }
+
+ @Test
+ public void shouldReset() throws IOException {
+ shouldReadWriteWhenDataIsNotAlignedWithBlock();
+ long[] data = new long[5 * blockSize];
+ for (int i = 0; i < blockSize * 5; i++) {
+ data[i] = i * 2;
+ }
+ writer.reset();
+ shouldWriteAndRead(data);
+ }
+
+ @Test
+ public void randomDataTest() throws IOException {
+ int maxSize = 1000;
+ long[] data = new long[maxSize];
+
+ for (int round = 0; round < 100000; round++) {
+ int size = random.nextInt(maxSize);
+
+ for (int i = 0; i < size; i++) {
+ data[i] = random.nextLong();
+ }
+ shouldReadAndWrite(data, size);
+ writer.reset();
+ }
+ }
+
+ private void shouldWriteAndRead(long[] data) throws IOException {
+ shouldReadAndWrite(data, data.length);
+ }
+
+ private void shouldReadAndWrite(long[] data, int length) throws IOException {
+ writeData(data, length);
+ reader = new DeltaBinaryPackingValuesReader();
+ byte[] page = writer.getBytes().toByteArray();
+ int miniBlockSize = blockSize / miniBlockNum;
+
+ double miniBlockFlushed = Math.ceil(((double) length - 1) / miniBlockSize);
+ double blockFlushed = Math.ceil(((double) length - 1) / blockSize);
+ double estimatedSize = 3 * 5 + 1 * 10 //blockHeader, 3 * int + 1 * long
+ + 8 * miniBlockFlushed * miniBlockSize //data(aligned to miniBlock)
+ + blockFlushed * miniBlockNum //bitWidth of mini blocks
+ + (10.0 * blockFlushed);//min delta for each block
+ assertTrue(estimatedSize >= page.length);
+ reader.initFromPage(100, page, 0);
+
+ for (int i = 0; i < length; i++) {
+ assertEquals(data[i], reader.readLong());
+ }
+ }
+
+ private void writeData(long[] data) {
+ writeData(data, data.length);
+ }
+
+ private void writeData(long[] data, int length) {
+ for (int i = 0; i < length; i++) {
+ writer.writeLong(data[i]);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java
deleted file mode 100644
index 6308e47..0000000
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.column.values.delta;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Random;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.parquet.bytes.DirectByteBufferAllocator;
-import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.column.values.ValuesWriter;
-import org.apache.parquet.io.ParquetDecodingException;
-
-public class DeltaBinaryPackingValuesWriterTest {
- DeltaBinaryPackingValuesReader reader;
- private int blockSize;
- private int miniBlockNum;
- private ValuesWriter writer;
- private Random random;
-
- @Before
- public void setUp() {
- blockSize = 128;
- miniBlockNum = 4;
- writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 200, new DirectByteBufferAllocator());
- random = new Random();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void miniBlockSizeShouldBeMultipleOf8() {
- new DeltaBinaryPackingValuesWriter(1281, 4, 100, 100, new DirectByteBufferAllocator());
- }
-
- /* When data size is multiple of Block*/
- @Test
- public void shouldWriteWhenDataIsAlignedWithBlock() throws IOException {
- int[] data = new int[5 * blockSize];
- for (int i = 0; i < blockSize * 5; i++) {
- data[i] = random.nextInt();
- }
- shouldWriteAndRead(data);
- }
-
- @Test
- public void shouldWriteAndReadWhenBlockIsNotFullyWritten() throws IOException {
- int[] data = new int[blockSize - 3];
- for (int i = 0; i < data.length; i++) {
- data[i] = random.nextInt();
- }
- shouldWriteAndRead(data);
- }
-
- @Test
- public void shouldWriteAndReadWhenAMiniBlockIsNotFullyWritten() throws IOException {
- int miniBlockSize = blockSize / miniBlockNum;
- int[] data = new int[miniBlockSize - 3];
- for (int i = 0; i < data.length; i++) {
- data[i] = random.nextInt();
- }
- shouldWriteAndRead(data);
- }
-
- @Test
- public void shouldWriteNegativeDeltas() throws IOException {
- int[] data = new int[blockSize];
- for (int i = 0; i < data.length; i++) {
- data[i] = 10 - (i * 32 - random.nextInt(6));
- }
- shouldWriteAndRead(data);
- }
-
- @Test
- public void shouldWriteAndReadWhenDeltasAreSame() throws IOException {
- int[] data = new int[2 * blockSize];
- for (int i = 0; i < blockSize; i++) {
- data[i] = i * 32;
- }
- shouldWriteAndRead(data);
- }
-
- @Test
- public void shouldWriteAndReadWhenValuesAreSame() throws IOException {
- int[] data = new int[2 * blockSize];
- for (int i = 0; i < blockSize; i++) {
- data[i] = 3;
- }
- shouldWriteAndRead(data);
- }
-
- @Test
- public void shouldWriteWhenDeltaIs0ForEachBlock() throws IOException {
- int[] data = new int[5 * blockSize + 1];
- for (int i = 0; i < data.length; i++) {
- data[i] = (i - 1) / blockSize;
- }
- shouldWriteAndRead(data);
- }
-
- @Test
- public void shouldReadWriteWhenDataIsNotAlignedWithBlock() throws IOException {
- int[] data = new int[5 * blockSize + 3];
- for (int i = 0; i < data.length; i++) {
- data[i] = random.nextInt(20) - 10;
- }
- shouldWriteAndRead(data);
- }
-
- @Test
- public void shouldReadMaxMinValue() throws IOException {
- int[] data = new int[10];
- for (int i = 0; i < data.length; i++) {
- if(i%2==0) {
- data[i]=Integer.MIN_VALUE;
- }else {
- data[i]=Integer.MAX_VALUE;
- }
- }
- shouldWriteAndRead(data);
- }
-
- @Test
- public void shouldReturnCorrectOffsetAfterInitialization() throws IOException {
- int[] data = new int[2 * blockSize + 3];
- for (int i = 0; i < data.length; i++) {
- data[i] = i * 32;
- }
- writeData(data);
-
- reader = new DeltaBinaryPackingValuesReader();
- BytesInput bytes = writer.getBytes();
- byte[] valueContent = bytes.toByteArray();
- byte[] pageContent = new byte[valueContent.length * 10];
- int contentOffsetInPage = 33;
- System.arraycopy(valueContent, 0, pageContent, contentOffsetInPage, valueContent.length);
-
- //offset should be correct
- reader.initFromPage(100, ByteBuffer.wrap(pageContent), contentOffsetInPage);
- int offset= reader.getNextOffset();
- assertEquals(valueContent.length + contentOffsetInPage, offset);
-
- //should be able to read data correclty
- for (int i : data) {
- assertEquals(i, reader.readInteger());
- }
- }
-
- @Test
- public void shouldThrowExceptionWhenReadMoreThanWritten() throws IOException {
- int[] data = new int[5 * blockSize + 1];
- for (int i = 0; i < data.length; i++) {
- data[i] = i * 32;
- }
- shouldWriteAndRead(data);
- try {
- reader.readInteger();
- } catch (ParquetDecodingException e) {
- assertEquals("no more value to read, total value count is " + data.length, e.getMessage());
- }
-
- }
-
- @Test
- public void shouldSkip() throws IOException {
- int[] data = new int[5 * blockSize + 1];
- for (int i = 0; i < data.length; i++) {
- data[i] = i * 32;
- }
- writeData(data);
- reader = new DeltaBinaryPackingValuesReader();
- reader.initFromPage(100, writer.getBytes().toByteBuffer(), 0);
- for (int i = 0; i < data.length; i++) {
- if (i % 3 == 0) {
- reader.skip();
- } else {
- assertEquals(i * 32, reader.readInteger());
- }
- }
- }
-
- @Test
- public void shouldReset() throws IOException {
- shouldReadWriteWhenDataIsNotAlignedWithBlock();
- int[] data = new int[5 * blockSize];
- for (int i = 0; i < blockSize * 5; i++) {
- data[i] = i * 2;
- }
- writer.reset();
- shouldWriteAndRead(data);
- }
-
- @Test
- public void randomDataTest() throws IOException {
- int maxSize = 1000;
- int[] data = new int[maxSize];
-
- for (int round = 0; round < 100000; round++) {
-
-
- int size = random.nextInt(maxSize);
-
- for (int i = 0; i < size; i++) {
- data[i] = random.nextInt();
- }
- shouldReadAndWrite(data, size);
- writer.reset();
- }
- }
-
- private void shouldWriteAndRead(int[] data) throws IOException {
- shouldReadAndWrite(data, data.length);
- }
-
- private void shouldReadAndWrite(int[] data, int length) throws IOException {
- writeData(data, length);
- reader = new DeltaBinaryPackingValuesReader();
- byte[] page = writer.getBytes().toByteArray();
- int miniBlockSize = blockSize / miniBlockNum;
-
- double miniBlockFlushed = Math.ceil(((double) length - 1) / miniBlockSize);
- double blockFlushed = Math.ceil(((double) length - 1) / blockSize);
- double estimatedSize = 4 * 5 //blockHeader
- + 4 * miniBlockFlushed * miniBlockSize //data(aligned to miniBlock)
- + blockFlushed * miniBlockNum //bitWidth of mini blocks
- + (5.0 * blockFlushed);//min delta for each block
- assertTrue(estimatedSize >= page.length);
- reader.initFromPage(100, ByteBuffer.wrap(page), 0);
-
- for (int i = 0; i < length; i++) {
- assertEquals(data[i], reader.readInteger());
- }
- }
-
- private void writeData(int[] data) {
- writeData(data, data.length);
- }
-
- private void writeData(int[] data, int length) {
- for (int i = 0; i < length; i++) {
- writer.writeInteger(data[i]);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java
index 40f6bfc..43cce3a 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java
@@ -18,11 +18,13 @@
*/
package org.apache.parquet.column.values.delta.benchmark;
-import org.junit.Test;
+import java.util.Random;
+
import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
-import java.util.Random;
+import org.junit.Test;
public class BenchmarkIntegerOutputSize {
public static int blockSize=128;
@@ -78,8 +80,10 @@ public class BenchmarkIntegerOutputSize {
}
public void testRandomIntegers(IntFunc func,int bitWidth) {
- DeltaBinaryPackingValuesWriter delta=new DeltaBinaryPackingValuesWriter(blockSize,miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
- RunLengthBitPackingHybridValuesWriter rle= new RunLengthBitPackingHybridValuesWriter(bitWidth, 100, 20000, new DirectByteBufferAllocator());
+ DeltaBinaryPackingValuesWriter delta = new DeltaBinaryPackingValuesWriterForInteger(
+ blockSize,miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
+ RunLengthBitPackingHybridValuesWriter rle = new RunLengthBitPackingHybridValuesWriter(
+ bitWidth, 100, 20000, new DirectByteBufferAllocator());
for (int i = 0; i < dataSize; i++) {
int v = func.getIntValue();
delta.writeInteger(v);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
index 4ad5dad..488208c 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
@@ -18,24 +18,25 @@
*/
package org.apache.parquet.column.values.delta.benchmark;
-import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
-import com.carrotsearch.junitbenchmarks.BenchmarkRule;
-import com.carrotsearch.junitbenchmarks.annotation.AxisRange;
-import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
-import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesReader;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Random;
+import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
+import com.carrotsearch.junitbenchmarks.BenchmarkRule;
+import com.carrotsearch.junitbenchmarks.annotation.AxisRange;
+import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart;
@AxisRange(min = 0, max = 1)
@BenchmarkMethodChart(filePrefix = "benchmark-encoding-reading-random")
@@ -56,8 +57,10 @@ public class BenchmarkReadingRandomIntegers {
data[i] = random.nextInt(100) - 200;
}
- ValuesWriter delta = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
- ValuesWriter rle = new RunLengthBitPackingHybridValuesWriter(32, 100, 20000, new DirectByteBufferAllocator());
+ ValuesWriter delta = new DeltaBinaryPackingValuesWriterForInteger(
+ blockSize, miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
+ ValuesWriter rle = new RunLengthBitPackingHybridValuesWriter(
+ 32, 100, 20000, new DirectByteBufferAllocator());
for (int i = 0; i < data.length; i++) {
delta.writeInteger(data[i]);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java
index 80e6533..f63eeda 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java
@@ -18,18 +18,21 @@
*/
package org.apache.parquet.column.values.delta.benchmark;
-import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
-import com.carrotsearch.junitbenchmarks.BenchmarkRule;
-import com.carrotsearch.junitbenchmarks.annotation.AxisRange;
-import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
+import java.util.Random;
+
import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
-import java.util.Random;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.carrotsearch.junitbenchmarks.BenchmarkOptions;
+import com.carrotsearch.junitbenchmarks.BenchmarkRule;
+import com.carrotsearch.junitbenchmarks.annotation.AxisRange;
+import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart;
@AxisRange(min = 0, max = 1)
@BenchmarkMethodChart(filePrefix = "benchmark-encoding-writing-random")
@@ -51,7 +54,8 @@ public class RandomWritingBenchmarkTest extends BenchMarkTest{
@BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2)
@Test
public void writeDeltaPackingTest(){
- DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
+ DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriterForInteger(
+ blockSize, miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
runWriteTest(writer);
}
@@ -65,7 +69,8 @@ public class RandomWritingBenchmarkTest extends BenchMarkTest{
@BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2)
@Test
public void writeDeltaPackingTest2(){
- DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
+ DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriterForInteger(
+ blockSize, miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
runWriteTest(writer);
}
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java
index d40721a..049f7bd 100644
--- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java
@@ -234,6 +234,42 @@ public class BytesUtils {
}
/**
+ * uses a trick mentioned in https://developers.google.com/protocol-buffers/docs/encoding to read zigZag encoded data
+ * TODO: the implementation is compatible with readZigZagVarInt. Is there a need for different functions?
+ * @param in
+ * @return
+ * @throws IOException
+ */
+ public static long readZigZagVarLong(InputStream in) throws IOException {
+ long raw = readUnsignedVarLong(in);
+ long temp = (((raw << 63) >> 63) ^ raw) >> 1;
+ return temp ^ (raw & (1L << 63));
+ }
+
+ public static long readUnsignedVarLong(InputStream in) throws IOException {
+ long value = 0;
+ int i = 0;
+ long b;
+ while (((b = in.read()) & 0x80) != 0) {
+ value |= (b & 0x7F) << i;
+ i += 7;
+ }
+ return value | (b << i);
+ }
+
+ public static void writeUnsignedVarLong(long value, OutputStream out) throws IOException {
+ while ((value & 0xFFFFFFFFFFFFFF80L) != 0L) {
+ out.write((int)((value & 0x7F) | 0x80));
+ value >>>= 7;
+ }
+ out.write((int)(value & 0x7F));
+ }
+
+ public static void writeZigZagVarLong(long longValue, OutputStream out) throws IOException{
+ writeUnsignedVarLong((longValue << 1) ^ (longValue >> 63), out);
+ }
+
+ /**
* @param bitLength a count of bits
* @return the corresponding byte count padded to the next byte
*/
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java b/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java
index 40190ee..cd9c6b2 100644
--- a/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java
+++ b/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java
@@ -125,6 +125,23 @@ abstract public class BytesInput {
}
/**
+ * @param longValue the long to write
+ * @return a BytesInput that will write var long
+ */
+ public static BytesInput fromUnsignedVarLong(long longValue) {
+ return new UnsignedVarLongBytesInput(longValue);
+ }
+
+ /**
+ *
+ * @param longValue the long to write
+ */
+ public static BytesInput fromZigZagVarLong(long longValue) {
+ long zigZag = (longValue << 1) ^ (longValue >> 63);
+ return new UnsignedVarLongBytesInput(zigZag);
+ }
+
+ /**
* @param arrayOut
* @return a BytesInput that will write the content of the buffer
*/
@@ -320,7 +337,27 @@ abstract public class BytesInput {
@Override
public long size() {
- int s = 5 - ((Integer.numberOfLeadingZeros(intValue) + 3) / 7);
+ int s = (38 - Integer.numberOfLeadingZeros(intValue)) / 7;
+ return s == 0 ? 1 : s;
+ }
+ }
+
+ private static class UnsignedVarLongBytesInput extends BytesInput {
+
+ private final long longValue;
+
+ public UnsignedVarLongBytesInput(long longValue) {
+ this.longValue = longValue;
+ }
+
+ @Override
+ public void writeAllTo(OutputStream out) throws IOException {
+ BytesUtils.writeUnsignedVarLong(longValue, out);
+ }
+
+ @Override
+ public long size() {
+ int s = (70 - Long.numberOfLeadingZeros(longValue)) / 7;
return s == 0 ? 1 : s;
}
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePackerForLong.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePackerForLong.java b/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePackerForLong.java
new file mode 100644
index 0000000..9859f5b
--- /dev/null
+++ b/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePackerForLong.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.parquet.column.values.bitpacking;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Packs and unpacks INT64 into bytes
+ *
+ * packing unpacking treats: - n values at a time (with n % 8 == 0) - bitWidth * (n/8) bytes at a
+ * time.
+ *
+ * @author Vassil Lunchev
+ *
+ */
+public abstract class BytePackerForLong {
+
+ private final int bitWidth;
+
+ BytePackerForLong(int bitWidth) {
+ this.bitWidth = bitWidth;
+ }
+
+ /**
+ * @return the width in bits used for encoding, also how many bytes are packed/unpacked at a time
+ * by pack8Values/unpack8Values
+ */
+ public final int getBitWidth() {
+ return bitWidth;
+ }
+
+ /**
+ * pack 8 values from input at inPos into bitWidth bytes in output at outPos. nextPosition: inPos
+ * += 8; outPos += getBitWidth()
+ *
+ * @param input the input values
+ * @param inPos where to read from in input
+ * @param output the output bytes
+ * @param outPos where to write to in output
+ */
+ public abstract void pack8Values(final long[] input, final int inPos, final byte[] output,
+ final int outPos);
+
+ /**
+ * pack 32 values from input at inPos into bitWidth * 4 bytes in output at outPos. nextPosition:
+ * inPos += 32; outPos += getBitWidth() * 4
+ *
+ * @param input the input values
+ * @param inPos where to read from in input
+ * @param output the output bytes
+ * @param outPos where to write to in output
+ */
+ public abstract void pack32Values(long[] input, int inPos, byte[] output, int outPos);
+
+ /**
+ * unpack bitWidth bytes from input at inPos into 8 values in output at outPos. nextPosition:
+ * inPos += getBitWidth(); outPos += 8
+ *
+ * @param input the input bytes
+ * @param inPos where to read from in input
+ * @param output the output values
+ * @param outPos where to write to in output
+ */
+ public abstract void unpack8Values(final ByteBuffer input, final int inPos, final long[] output,
+ final int outPos);
+
+ /**
+ * unpack bitWidth * 4 bytes from input at inPos into 32 values in output at outPos. nextPosition:
+ * inPos += getBitWidth() * 4; outPos += 32
+ *
+ * @param input the input bytes
+ * @param inPos where to read from in input
+ * @param output the output values
+ * @param outPos where to write to in output
+ */
+ public abstract void unpack32Values(ByteBuffer input, int inPos, long[] output, int outPos);
+
+ /**
+ * unpack bitWidth bytes from input at inPos into 8 values in output at outPos. nextPosition:
+ * inPos += getBitWidth(); outPos += 8
+ *
+ * @param input the input bytes
+ * @param inPos where to read from in input
+ * @param output the output values
+ * @param outPos where to write to in output
+ */
+ public abstract void unpack8Values(final byte[] input, final int inPos, final long[] output,
+ final int outPos);
+
+ /**
+ * unpack bitWidth * 4 bytes from input at inPos into 32 values in output at outPos. nextPosition:
+ * inPos += getBitWidth() * 4; outPos += 32
+ *
+ * @param input the input bytes
+ * @param inPos where to read from in input
+ * @param output the output values
+ * @param outPos where to write to in output
+ */
+ public abstract void unpack32Values(byte[] input, int inPos, long[] output, int outPos);
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePackerForLongFactory.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePackerForLongFactory.java b/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePackerForLongFactory.java
new file mode 100644
index 0000000..39086ac
--- /dev/null
+++ b/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePackerForLongFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bitpacking;
+
+public interface BytePackerForLongFactory {
+
+ BytePackerForLong newBytePackerForLong(int width);
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/Packer.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/Packer.java b/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/Packer.java
index ed14edf..5c56941 100644
--- a/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/Packer.java
+++ b/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/Packer.java
@@ -39,6 +39,10 @@ public enum Packer {
public BytePacker newBytePacker(int width) {
return beBytePackerFactory.newBytePacker(width);
}
+ @Override
+ public BytePackerForLong newBytePackerForLong(int width) {
+ return beBytePackerForLongFactory.newBytePackerForLong(width);
+ }
},
/**
@@ -54,6 +58,10 @@ public enum Packer {
public BytePacker newBytePacker(int width) {
return leBytePackerFactory.newBytePacker(width);
}
+ @Override
+ public BytePackerForLong newBytePackerForLong(int width) {
+ return leBytePackerForLongFactory.newBytePackerForLong(width);
+ }
};
private static IntPackerFactory getIntPackerFactory(String name) {
@@ -64,6 +72,10 @@ public enum Packer {
return (BytePackerFactory)getStaticField("org.apache.parquet.column.values.bitpacking." + name, "factory");
}
+ private static BytePackerForLongFactory getBytePackerForLongFactory(String name) {
+ return (BytePackerForLongFactory)getStaticField("org.apache.parquet.column.values.bitpacking." + name, "factory");
+ }
+
private static Object getStaticField(String className, String fieldName) {
try {
return Class.forName(className).getField(fieldName).get(null);
@@ -80,10 +92,12 @@ public enum Packer {
}
}
- static BytePackerFactory beBytePackerFactory = getBytePackerFactory("ByteBitPackingBE");
static IntPackerFactory beIntPackerFactory = getIntPackerFactory("LemireBitPackingBE");
- static BytePackerFactory leBytePackerFactory = getBytePackerFactory("ByteBitPackingLE");
static IntPackerFactory leIntPackerFactory = getIntPackerFactory("LemireBitPackingLE");
+ static BytePackerFactory beBytePackerFactory = getBytePackerFactory("ByteBitPackingBE");
+ static BytePackerFactory leBytePackerFactory = getBytePackerFactory("ByteBitPackingLE");
+ static BytePackerForLongFactory beBytePackerForLongFactory = getBytePackerForLongFactory("ByteBitPackingForLongBE");
+ static BytePackerForLongFactory leBytePackerForLongFactory = getBytePackerForLongFactory("ByteBitPackingForLongLE");
/**
* @param width the width in bits of the packed values
@@ -96,4 +110,10 @@ public enum Packer {
* @return a byte based packer
*/
public abstract BytePacker newBytePacker(int width);
+
+ /**
+ * @param width the width in bits of the packed values
+ * @return a byte based packer for INT64
+ */
+ public abstract BytePackerForLong newBytePackerForLong(int width);
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bcfe6c5/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPacking.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPacking.java b/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPacking.java
index 64679e5..ce9b3ac 100644
--- a/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPacking.java
+++ b/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPacking.java
@@ -196,6 +196,20 @@ public class TestBitPacking {
return sb.toString();
}
+ public static String toString(long[] vals) {
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ for (long i : vals) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(" ");
+ }
+ sb.append(i);
+ }
+ return sb.toString();
+ }
+
public static String toString(byte[] bytes) {
StringBuilder sb = new StringBuilder();
boolean first = true;