You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2015/11/04 18:57:34 UTC
[2/4] parquet-mr git commit: PARQUET-77: ByteBuffer use in read and
write paths
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java
index b62ef84..770f4dc 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.junit.Test;
import org.junit.Assert;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.column.values.Utils;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
@@ -35,7 +36,7 @@ public class TestDeltaByteArray {
@Test
public void testSerialization () throws Exception {
- DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024);
+ DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator());
DeltaByteArrayReader reader = new DeltaByteArrayReader();
assertReadWrite(writer, reader, values);
@@ -43,14 +44,14 @@ public class TestDeltaByteArray {
@Test
public void testRandomStrings() throws Exception {
- DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024);
+ DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator());
DeltaByteArrayReader reader = new DeltaByteArrayReader();
assertReadWrite(writer, reader, randvalues);
}
@Test
public void testLengths() throws IOException {
- DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024);
+ DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator());
ValuesReader reader = new DeltaBinaryPackingValuesReader();
Utils.writeData(writer, values);
@@ -82,7 +83,7 @@ public class TestDeltaByteArray {
@Test
public void testWriterReset() throws Exception {
- DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024);
+ DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator());
assertReadWrite(writer, new DeltaByteArrayReader(), values);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java
index c61ef30..eac4bd2 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
import org.junit.Rule;
import org.junit.Test;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.column.values.Utils;
import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter;
@@ -54,7 +55,7 @@ public class BenchmarkDeltaByteArray {
@BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
@Test
public void benchmarkRandomStringsWithPlainValuesWriter() throws IOException {
- PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024);
+ PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator());
BinaryPlainValuesReader reader = new BinaryPlainValuesReader();
Utils.writeData(writer, values);
@@ -66,7 +67,7 @@ public class BenchmarkDeltaByteArray {
@BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
@Test
public void benchmarkRandomStringsWithDeltaLengthByteArrayValuesWriter() throws IOException {
- DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024);
+ DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator());
DeltaByteArrayReader reader = new DeltaByteArrayReader();
Utils.writeData(writer, values);
@@ -78,7 +79,7 @@ public class BenchmarkDeltaByteArray {
@BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
@Test
public void benchmarkSortedStringsWithPlainValuesWriter() throws IOException {
- PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024);
+ PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator());
BinaryPlainValuesReader reader = new BinaryPlainValuesReader();
Utils.writeData(writer, sortedVals);
@@ -90,7 +91,7 @@ public class BenchmarkDeltaByteArray {
@BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
@Test
public void benchmarkSortedStringsWithDeltaLengthByteArrayValuesWriter() throws IOException {
- DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024);
+ DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator());
DeltaByteArrayReader reader = new DeltaByteArrayReader();
Utils.writeData(writer, sortedVals);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
index 020868e..ada1c93 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
@@ -28,10 +28,12 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
@@ -54,27 +56,27 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
public class TestDictionary {
private <I extends DictionaryValuesWriter> FallbackValuesWriter<I, PlainValuesWriter> plainFallBack(I dvw, int initialSize) {
- return FallbackValuesWriter.of(dvw, new PlainValuesWriter(initialSize, initialSize * 5));
+ return FallbackValuesWriter.of(dvw, new PlainValuesWriter(initialSize, initialSize * 5, new DirectByteBufferAllocator()));
}
private FallbackValuesWriter<PlainBinaryDictionaryValuesWriter, PlainValuesWriter> newPlainBinaryDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
- return plainFallBack(new PlainBinaryDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize);
+ return plainFallBack(new PlainBinaryDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY, new DirectByteBufferAllocator()), initialSize);
}
private FallbackValuesWriter<PlainLongDictionaryValuesWriter, PlainValuesWriter> newPlainLongDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
- return plainFallBack(new PlainLongDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize);
+ return plainFallBack(new PlainLongDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY, new DirectByteBufferAllocator()), initialSize);
}
private FallbackValuesWriter<PlainIntegerDictionaryValuesWriter, PlainValuesWriter> newPlainIntegerDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
- return plainFallBack(new PlainIntegerDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize);
+ return plainFallBack(new PlainIntegerDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY, new DirectByteBufferAllocator()), initialSize);
}
private FallbackValuesWriter<PlainDoubleDictionaryValuesWriter, PlainValuesWriter> newPlainDoubleDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
- return plainFallBack(new PlainDoubleDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize);
+ return plainFallBack(new PlainDoubleDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY, new DirectByteBufferAllocator()), initialSize);
}
private FallbackValuesWriter<PlainFloatDictionaryValuesWriter, PlainValuesWriter> newPlainFloatDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
- return plainFallBack(new PlainFloatDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize);
+ return plainFallBack(new PlainFloatDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY, new DirectByteBufferAllocator()), initialSize);
}
@Test
@@ -116,7 +118,7 @@ public class TestDictionary {
//Fallbacked to Plain encoding, therefore use PlainValuesReader to read it back
ValuesReader reader = new BinaryPlainValuesReader();
- reader.initFromPage(100, cw.getBytes().toByteArray(), 0);
+ reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
for (long i = 0; i < 100; i++) {
assertEquals(Binary.fromString("str" + i), reader.readBytes());
@@ -202,13 +204,13 @@ public class TestDictionary {
DictionaryValuesReader cr = initDicReader(cw, PrimitiveTypeName.INT64);
- cr.initFromPage(COUNT, bytes1.toByteArray(), 0);
+ cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0);
for (long i = 0; i < COUNT; i++) {
long back = cr.readLong();
assertEquals(i % 50, back);
}
- cr.initFromPage(COUNT2, bytes2.toByteArray(), 0);
+ cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0);
for (long i = COUNT2; i > 0; i--) {
long back = cr.readLong();
assertEquals(i % 50, back);
@@ -226,7 +228,7 @@ public class TestDictionary {
}
}
- reader.initFromPage(100, cw.getBytes().toByteArray(), 0);
+ reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
for (long i = 0; i < 100; i++) {
assertEquals(i, reader.readLong());
@@ -272,13 +274,13 @@ public class TestDictionary {
final DictionaryValuesReader cr = initDicReader(cw, DOUBLE);
- cr.initFromPage(COUNT, bytes1.toByteArray(), 0);
+ cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0);
for (double i = 0; i < COUNT; i++) {
double back = cr.readDouble();
assertEquals(i % 50, back, 0.0);
}
- cr.initFromPage(COUNT2, bytes2.toByteArray(), 0);
+ cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0);
for (double i = COUNT2; i > 0; i--) {
double back = cr.readDouble();
assertEquals(i % 50, back, 0.0);
@@ -297,7 +299,7 @@ public class TestDictionary {
}
}
- reader.initFromPage(100, cw.getBytes().toByteArray(), 0);
+ reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
for (double i = 0; i < 100; i++) {
assertEquals(i, reader.readDouble(), 0.00001);
@@ -343,13 +345,13 @@ public class TestDictionary {
DictionaryValuesReader cr = initDicReader(cw, INT32);
- cr.initFromPage(COUNT, bytes1.toByteArray(), 0);
+ cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0);
for (int i = 0; i < COUNT; i++) {
int back = cr.readInteger();
assertEquals(i % 50, back);
}
- cr.initFromPage(COUNT2, bytes2.toByteArray(), 0);
+ cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0);
for (int i = COUNT2; i > 0; i--) {
int back = cr.readInteger();
assertEquals(i % 50, back);
@@ -368,7 +370,7 @@ public class TestDictionary {
}
}
- reader.initFromPage(100, cw.getBytes().toByteArray(), 0);
+ reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
for (int i = 0; i < 100; i++) {
assertEquals(i, reader.readInteger());
@@ -414,13 +416,13 @@ public class TestDictionary {
DictionaryValuesReader cr = initDicReader(cw, FLOAT);
- cr.initFromPage(COUNT, bytes1.toByteArray(), 0);
+ cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0);
for (float i = 0; i < COUNT; i++) {
float back = cr.readFloat();
assertEquals(i % 50, back, 0.0f);
}
- cr.initFromPage(COUNT2, bytes2.toByteArray(), 0);
+ cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0);
for (float i = COUNT2; i > 0; i--) {
float back = cr.readFloat();
assertEquals(i % 50, back, 0.0f);
@@ -439,7 +441,7 @@ public class TestDictionary {
}
}
- reader.initFromPage(100, cw.getBytes().toByteArray(), 0);
+ reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
for (float i = 0; i < 100; i++) {
assertEquals(i, reader.readFloat(), 0.00001);
@@ -473,14 +475,14 @@ public class TestDictionary {
DictionaryValuesReader reader = initDicReader(cw, INT32);
// pretend there are 100 nulls. what matters is offset = bytes.length.
- byte[] bytes = {0x00, 0x01, 0x02, 0x03}; // data doesn't matter
- int offset = bytes.length;
+ ByteBuffer bytes = ByteBuffer.wrap(new byte[] {0x00, 0x01, 0x02, 0x03}); // data doesn't matter
+ int offset = bytes.remaining();
reader.initFromPage(100, bytes, offset);
}
private DictionaryValuesReader initDicReader(ValuesWriter cw, PrimitiveTypeName type)
throws IOException {
- final DictionaryPage dictionaryPage = cw.createDictionaryPage().copy();
+ final DictionaryPage dictionaryPage = cw.toDictPageAndClose().copy();
final ColumnDescriptor descriptor = new ColumnDescriptor(new String[] {"foo"}, type, 0, 0);
final Dictionary dictionary = PLAIN.initDictionary(descriptor, dictionaryPage);
final DictionaryValuesReader cr = new DictionaryValuesReader(dictionary);
@@ -488,14 +490,14 @@ public class TestDictionary {
}
private void checkDistinct(int COUNT, BytesInput bytes, ValuesReader cr, String prefix) throws IOException {
- cr.initFromPage(COUNT, bytes.toByteArray(), 0);
+ cr.initFromPage(COUNT, bytes.toByteBuffer(), 0);
for (int i = 0; i < COUNT; i++) {
Assert.assertEquals(prefix + i, cr.readBytes().toStringUsingUTF8());
}
}
private void checkRepeated(int COUNT, BytesInput bytes, ValuesReader cr, String prefix) throws IOException {
- cr.initFromPage(COUNT, bytes.toByteArray(), 0);
+ cr.initFromPage(COUNT, bytes.toByteBuffer(), 0);
for (int i = 0; i < COUNT; i++) {
Assert.assertEquals(prefix + i % 10, cr.readBytes().toStringUsingUTF8());
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
index 707a507..712fb27 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
@@ -18,9 +18,11 @@
*/
package org.apache.parquet.column.values.rle;
-import java.io.ByteArrayInputStream;
+import java.nio.ByteBuffer;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.junit.Test;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
import static org.junit.Assert.assertEquals;
@@ -39,7 +41,7 @@ public class RunLengthBitPackingHybridIntegrationTest {
private void doIntegrationTest(int bitWidth) throws Exception {
long modValue = 1L << bitWidth;
- RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 1000, 64000);
+ RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 1000, 64000, new DirectByteBufferAllocator());
int numValues = 0;
for (int i = 0; i < 100; i++) {
@@ -69,8 +71,8 @@ public class RunLengthBitPackingHybridIntegrationTest {
}
numValues += 1000;
- byte[] encodedBytes = encoder.toBytes().toByteArray();
- ByteArrayInputStream in = new ByteArrayInputStream(encodedBytes);
+ ByteBuffer encodedBytes = encoder.toBytes().toByteBuffer();
+ ByteBufferInputStream in = new ByteBufferInputStream(encodedBytes);
RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
index 06664de..5696d7b 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
@@ -21,12 +21,15 @@ package org.apache.parquet.column.values.rle;
import static org.junit.Assert.assertEquals;
import java.io.ByteArrayInputStream;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.junit.Test;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.bitpacking.BytePacker;
import org.apache.parquet.column.values.bitpacking.Packer;
@@ -36,9 +39,19 @@ import org.apache.parquet.column.values.bitpacking.Packer;
*/
public class TestRunLengthBitPackingHybridEncoder {
+ private RunLengthBitPackingHybridEncoder getRunLengthBitPackingHybridEncoder() {
+ return getRunLengthBitPackingHybridEncoder(3, 5, 10);
+ }
+
+ private RunLengthBitPackingHybridEncoder getRunLengthBitPackingHybridEncoder(
+ int bitWidth, int initialCapacity, int pageSize) {
+ return new RunLengthBitPackingHybridEncoder(bitWidth, initialCapacity,
+ pageSize, new DirectByteBufferAllocator());
+ }
+
@Test
public void testRLEOnly() throws Exception {
- RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10);
+ RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder();
for (int i = 0; i < 100; i++) {
encoder.writeInt(4);
}
@@ -68,7 +81,7 @@ public class TestRunLengthBitPackingHybridEncoder {
// make sure that repeated 0s at the beginning
// of the stream don't trip up the repeat count
- RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10);
+ RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder();
for (int i = 0; i < 10; i++) {
encoder.writeInt(0);
}
@@ -86,7 +99,7 @@ public class TestRunLengthBitPackingHybridEncoder {
@Test
public void testBitWidthZero() throws Exception {
- RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(0, 5, 10);
+ RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder(0, 5, 10);
for (int i = 0; i < 10; i++) {
encoder.writeInt(0);
}
@@ -102,8 +115,7 @@ public class TestRunLengthBitPackingHybridEncoder {
@Test
public void testBitPackingOnly() throws Exception {
- RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10);
-
+ RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder();
for (int i = 0; i < 100; i++) {
encoder.writeInt(i % 3);
}
@@ -125,7 +137,7 @@ public class TestRunLengthBitPackingHybridEncoder {
@Test
public void testBitPackingOverflow() throws Exception {
- RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10);
+ RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder();
for (int i = 0; i < 1000; i++) {
encoder.writeInt(i % 3);
@@ -157,7 +169,7 @@ public class TestRunLengthBitPackingHybridEncoder {
@Test
public void testTransitionFromBitPackingToRle() throws Exception {
- RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10);
+ RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder();
// 5 obviously bit-packed values
encoder.writeInt(0);
@@ -195,7 +207,7 @@ public class TestRunLengthBitPackingHybridEncoder {
@Test
public void testPaddingZerosOnUnfinishedBitPackedRuns() throws Exception {
- RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(5, 5, 10);
+ RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder(5, 5, 10);
for (int i = 0; i < 9; i++) {
encoder.writeInt(i+1);
}
@@ -214,7 +226,7 @@ public class TestRunLengthBitPackingHybridEncoder {
@Test
public void testSwitchingModes() throws Exception {
- RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(9, 100, 1000);
+ RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder(9, 100, 1000);
// rle first
for (int i = 0; i < 25; i++) {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java b/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java
index 3abf804..aff3937 100644
--- a/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java
@@ -27,6 +27,7 @@ import static org.apache.parquet.example.Paper.schema3;
import java.util.logging.Level;
import org.apache.parquet.Log;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.column.impl.ColumnWriteStoreV1;
import org.apache.parquet.column.page.mem.MemPageStore;
@@ -77,7 +78,7 @@ public class PerfTest {
private static void write(MemPageStore memPageStore) {
- ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0);
+ ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0, new HeapByteBufferAllocator());
MessageColumnIO columnIO = newColumnFactory(schema);
GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java b/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
index e7274cc..06f22b6 100644
--- a/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
+++ b/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
@@ -38,6 +38,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -526,7 +527,7 @@ public class TestColumnIO {
}
private ColumnWriteStoreV1 newColumnWriteStore(MemPageStore memPageStore) {
- return new ColumnWriteStoreV1(memPageStore, 800, 800, useDictionary, WriterVersion.PARQUET_1_0);
+ return new ColumnWriteStoreV1(memPageStore, 800, 800, useDictionary, WriterVersion.PARQUET_1_0, new HeapByteBufferAllocator());
}
@Test
@@ -599,6 +600,8 @@ public class TestColumnIO {
groupWriter.write(r2);
recordWriter.flush();
columns.validate();
+ columns.flush();
+ columns.close();
}
}
final class ValidatingColumnWriteStore implements ColumnWriteStore {
@@ -610,6 +613,11 @@ final class ValidatingColumnWriteStore implements ColumnWriteStore {
}
@Override
+ public void close() {
+
+ }
+
+ @Override
public ColumnWriter getColumnWriter(final ColumnDescriptor path) {
return new ColumnWriter() {
private void validate(Object value, int repetitionLevel,
@@ -630,6 +638,11 @@ final class ValidatingColumnWriteStore implements ColumnWriteStore {
}
@Override
+ public void write(float value, int repetitionLevel, int definitionLevel) {
+ validate(value, repetitionLevel, definitionLevel);
+ }
+
+ @Override
public void write(boolean value, int repetitionLevel, int definitionLevel) {
validate(value, repetitionLevel, definitionLevel);
}
@@ -645,8 +658,13 @@ final class ValidatingColumnWriteStore implements ColumnWriteStore {
}
@Override
- public void write(float value, int repetitionLevel, int definitionLevel) {
- validate(value, repetitionLevel, definitionLevel);
+ public void close() {
+
+ }
+
+ @Override
+ public long getBufferedSizeInMemory() {
+ throw new UnsupportedOperationException();
}
@Override
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java b/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java
index 9fde4b1..25b629b 100644
--- a/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java
+++ b/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java
@@ -21,6 +21,7 @@ package org.apache.parquet.io;
import java.util.ArrayList;
import java.util.List;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.apache.parquet.io.api.RecordConsumer;
import org.junit.Test;
@@ -258,7 +259,7 @@ public class TestFiltered {
private MemPageStore writeTestRecords(MessageColumnIO columnIO, int number) {
MemPageStore memPageStore = new MemPageStore(number * 2);
- ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 800, 800, false, WriterVersion.PARQUET_1_0);
+ ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 800, 800, false, WriterVersion.PARQUET_1_0, new HeapByteBufferAllocator());
RecordConsumer recordWriter = columnIO.getRecordWriter(columns);
GroupWriter groupWriter = new GroupWriter(recordWriter, schema);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java b/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
index bd8a69d..c8444dc 100644
--- a/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
+++ b/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
@@ -18,6 +18,8 @@
*/
package org.apache.parquet.io.api;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -143,6 +145,29 @@ public class TestBinary {
}
@Test
+ public void testEqualityMethods() throws Exception {
+ Binary bin1 = Binary.fromConstantByteArray("alice".getBytes(), 1, 3);
+ Binary bin2 = Binary.fromConstantByteBuffer(ByteBuffer.wrap("alice".getBytes(), 1, 3));
+ assertEquals(bin1, bin2);
+ }
+
+ @Test
+ public void testWriteAllTo() throws Exception {
+ byte[] orig = {10, 9 ,8, 7, 6, 5, 4, 3, 2, 1};
+ testWriteAllToHelper(Binary.fromConstantByteBuffer(ByteBuffer.wrap(orig)), orig);
+ ByteBuffer buf = ByteBuffer.allocateDirect(orig.length);
+ buf.put(orig);
+ buf.flip();
+ testWriteAllToHelper(Binary.fromConstantByteBuffer(buf), orig);
+ }
+
+ private void testWriteAllToHelper(Binary binary, byte[] orig) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream(orig.length);
+ binary.writeTo(out);
+ assertArrayEquals(orig, out.toByteArray());
+ }
+
+ @Test
public void testFromStringBinary() throws Exception {
testBinary(STRING_BF, false);
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/IOExceptionUtils.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/IOExceptionUtils.java b/parquet-common/src/main/java/org/apache/parquet/IOExceptionUtils.java
new file mode 100644
index 0000000..2ac8a2b
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/IOExceptionUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Utilities for managing I/O resources.
+ */
+public class IOExceptionUtils {
+
+ /**
+ * Call the #close() method on a {@see Closable}, wrapping any IOException
+ * in a runtime exception.
+ *
+ * @param closeable - resource to close
+ */
+ public static void closeQuietly(Closeable closeable) {
+ try {
+ closeable.close();
+ } catch(IOException e) {
+ throw new ParquetRuntimeException("Error closing I/O related resources.", e) {};
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/OutputStreamCloseException.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/OutputStreamCloseException.java b/parquet-common/src/main/java/org/apache/parquet/OutputStreamCloseException.java
new file mode 100644
index 0000000..5271000
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/OutputStreamCloseException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.parquet;
+
+/**
+ * Runtime exception indicating that a stream failed to be closed properly.
+ *
+ * Used to wrap up the checked IOException usually thrown from IO operations,
+ * these are generally not recoverable so it does not make sense to pollute the
+ * codebase declaring that they can be thrown whenever resources are being
+ * closed out.
+ */
+public class OutputStreamCloseException extends ParquetRuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ public OutputStreamCloseException() {
+ }
+
+ public OutputStreamCloseException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public OutputStreamCloseException(String message) {
+ super(message);
+ }
+
+ public OutputStreamCloseException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/ParquetRuntimeException.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/ParquetRuntimeException.java b/parquet-common/src/main/java/org/apache/parquet/ParquetRuntimeException.java
index f67b15a..d0f13a8 100644
--- a/parquet-common/src/main/java/org/apache/parquet/ParquetRuntimeException.java
+++ b/parquet-common/src/main/java/org/apache/parquet/ParquetRuntimeException.java
@@ -18,6 +18,9 @@
*/
package org.apache.parquet;
+import java.io.Closeable;
+import java.io.IOException;
+
/**
* The parent class for all runtime exceptions
*
@@ -42,5 +45,4 @@ abstract public class ParquetRuntimeException extends RuntimeException {
public ParquetRuntimeException(Throwable cause) {
super(cause);
}
-
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferAllocator.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferAllocator.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferAllocator.java
new file mode 100644
index 0000000..ee36b74
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferAllocator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.bytes;
+
+import java.nio.ByteBuffer;
+
+public interface ByteBufferAllocator {
+ ByteBuffer allocate(int size);
+
+ /**
+ * For RefCounted implementations using direct memory, the release method
+ * needs to be called to free references to the allocated memory.
+ */
+ void release(ByteBuffer b);
+
+ /**
+ * Indicates if this allocator will produce ByteBuffers backed by direct memory.
+ *
+ * @return true if direct memory backed buffers will be created by this allocator, else false
+ */
+ boolean isDirect();
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java
new file mode 100644
index 0000000..5b3b853
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * This ByteBufferInputStream does not consume the ByteBuffer being passed in,
+ * but will create a slice of the current buffer.
+ */
+public class ByteBufferInputStream extends InputStream {
+
+ protected ByteBuffer byteBuf;
+ protected int initPos;
+ protected int count;
+ public ByteBufferInputStream(ByteBuffer buffer) {
+ this(buffer, buffer.position(), buffer.remaining());
+ }
+
+ public ByteBufferInputStream(ByteBuffer buffer, int offset, int count) {
+ ByteBuffer temp = buffer.duplicate();
+ temp.position(offset);
+ byteBuf = temp.slice();
+ byteBuf.limit(count);
+ this.initPos = offset;
+ this.count = count;
+ }
+
+ public ByteBuffer toByteBuffer() {
+ return byteBuf.slice();
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (!byteBuf.hasRemaining()) {
+ return -1;
+ }
+ //Workaround for unsigned byte
+ return byteBuf.get() & 0xFF;
+ }
+
+ @Override
+ public int read(byte[] bytes, int offset, int length) throws IOException {
+ int count = Math.min(byteBuf.remaining(), length);
+ if (count == 0) return -1;
+ byteBuf.get(bytes, offset, count);
+ return count;
+ }
+
+ @Override
+ public long skip(long n) {
+ if (n > byteBuf.remaining())
+ n = byteBuf.remaining();
+ int pos = byteBuf.position();
+ byteBuf.position((int)(pos + n));
+ return n;
+ }
+
+
+ @Override
+ public int available() {
+ return byteBuf.remaining();
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java
index d96a1e5..d40721a 100644
--- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java
@@ -22,6 +22,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import org.apache.parquet.Log;
@@ -53,6 +54,21 @@ public class BytesUtils {
* @return
* @throws IOException
*/
+ public static int readIntLittleEndian(ByteBuffer in, int offset) throws IOException {
+ int ch4 = in.get(offset) & 0xff;
+ int ch3 = in.get(offset + 1) & 0xff;
+ int ch2 = in.get(offset + 2) & 0xff;
+ int ch1 = in.get(offset + 3) & 0xff;
+ return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+ }
+
+ /**
+ * reads an int in little endian at the given position
+ * @param in
+ * @param offset
+ * @return
+ * @throws IOException
+ */
public static int readIntLittleEndian(byte[] in, int offset) throws IOException {
int ch4 = in[offset] & 0xff;
int ch3 = in[offset + 1] & 0xff;
@@ -205,6 +221,14 @@ public class BytesUtils {
out.write(value & 0x7F);
}
+ public static void writeUnsignedVarInt(int value, ByteBuffer dest) throws IOException {
+ while ((value & 0xFFFFFF80) != 0L) {
+ dest.putInt((value & 0x7F) | 0x80);
+ value >>>= 7;
+ }
+ dest.putInt(value & 0x7F);
+ }
+
public static void writeZigZagVarInt(int intValue, OutputStream out) throws IOException{
writeUnsignedVarInt((intValue << 1) ^ (intValue >> 31), out);
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/bytes/DirectByteBufferAllocator.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/DirectByteBufferAllocator.java b/parquet-common/src/main/java/org/apache/parquet/bytes/DirectByteBufferAllocator.java
new file mode 100644
index 0000000..9fe4538
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/DirectByteBufferAllocator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.bytes;
+
+import java.nio.ByteBuffer;
+
+public class DirectByteBufferAllocator implements ByteBufferAllocator{
+ public static final DirectByteBufferAllocator getInstance(){return new DirectByteBufferAllocator();}
+ public DirectByteBufferAllocator() {
+ super();
+ }
+
+ public ByteBuffer allocate(final int size) {
+ return ByteBuffer.allocateDirect(size);
+ }
+
+ @Override
+ public void release(ByteBuffer b) {
+ // The ByteBuffer.allocateDirect
+ return;
+ }
+
+ @Override
+ public boolean isDirect() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/bytes/HeapByteBufferAllocator.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/HeapByteBufferAllocator.java b/parquet-common/src/main/java/org/apache/parquet/bytes/HeapByteBufferAllocator.java
new file mode 100644
index 0000000..c5f475d
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/HeapByteBufferAllocator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.bytes;
+
+import java.nio.ByteBuffer;
+
+public class HeapByteBufferAllocator implements ByteBufferAllocator{
+
+ public static final HeapByteBufferAllocator getInstance(){ return new HeapByteBufferAllocator();}
+
+ public HeapByteBufferAllocator() {
+ super();
+ }
+
+ public ByteBuffer allocate(final int size) {
+ return ByteBuffer.allocate(size);
+ }
+
+ public void release(ByteBuffer b) {
+ return;
+ }
+
+ @Override
+ public boolean isDirect() {
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java b/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java
index ac334ae..40190ee 100644
--- a/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java
+++ b/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java
@@ -25,6 +25,9 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
import org.apache.parquet.Log;
@@ -71,6 +74,15 @@ abstract public class BytesInput {
public static BytesInput from(InputStream in, int bytes) {
return new StreamBytesInput(in, bytes);
}
+
+ /**
+ * @param buffer
+ * @param length number of bytes to read
+ * @return a BytesInput that will read the given bytes from the ByteBuffer
+ */
+ public static BytesInput from(ByteBuffer buffer, int offset, int length) {
+ return new ByteBufferBytesInput(buffer, offset, length);
+ }
/**
*
@@ -121,7 +133,7 @@ abstract public class BytesInput {
}
/**
- * @param arrayOut
+ * @param baos - stream to wrap into a BytesInput
* @return a BytesInput that will write the content of the buffer
*/
public static BytesInput from(ByteArrayOutputStream baos) {
@@ -166,6 +178,24 @@ abstract public class BytesInput {
/**
*
+ * @return a new ByteBuffer materializing the contents of this input
+ * @throws IOException
+ */
+ public ByteBuffer toByteBuffer() throws IOException {
+ return ByteBuffer.wrap(toByteArray());
+ }
+
+ /**
+ *
+ * @return a new InputStream materializing the contents of this input
+ * @throws IOException
+ */
+ public InputStream toInputStream() throws IOException {
+ return new ByteBufferInputStream(toByteBuffer());
+ }
+
+ /**
+ *
* @return the size in bytes that would be written
*/
abstract public long size();
@@ -258,6 +288,10 @@ abstract public class BytesInput {
BytesUtils.writeIntLittleEndian(out, intValue);
}
+ public ByteBuffer toByteBuffer() throws IOException {
+ return ByteBuffer.allocate(4).putInt(0, intValue);
+ }
+
@Override
public long size() {
return 4;
@@ -278,6 +312,12 @@ abstract public class BytesInput {
BytesUtils.writeUnsignedVarInt(intValue, out);
}
+ public ByteBuffer toByteBuffer() throws IOException {
+ ByteBuffer ret = ByteBuffer.allocate((int) size());
+ BytesUtils.writeUnsignedVarInt(intValue, ret);
+ return ret;
+ }
+
@Override
public long size() {
int s = 5 - ((Integer.numberOfLeadingZeros(intValue) + 3) / 7);
@@ -296,6 +336,10 @@ abstract public class BytesInput {
return 0;
}
+ public ByteBuffer toByteBuffer() throws IOException {
+ return ByteBuffer.allocate(0);
+ }
+
}
private static class CapacityBAOSBytesInput extends BytesInput {
@@ -355,11 +399,49 @@ abstract public class BytesInput {
out.write(in, offset, length);
}
+ public ByteBuffer toByteBuffer() throws IOException {
+ return ByteBuffer.wrap(in, offset, length);
+ }
+
@Override
public long size() {
return length;
}
}
+
+ private static class ByteBufferBytesInput extends BytesInput {
+
+ private final ByteBuffer byteBuf;
+ private final int length;
+ private final int offset;
+ private ByteBufferBytesInput(ByteBuffer byteBuf, int offset, int length) {
+ this.byteBuf = byteBuf;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ @Override
+ public void writeAllTo(OutputStream out) throws IOException {
+ final WritableByteChannel outputChannel = Channels.newChannel(out);
+ byteBuf.position(offset);
+ ByteBuffer tempBuf = byteBuf.slice();
+ tempBuf.limit(length);
+ outputChannel.write(tempBuf);
+ }
+
+ @Override
+ public ByteBuffer toByteBuffer() throws IOException {
+ byteBuf.position(offset);
+ ByteBuffer buf = byteBuf.slice();
+ buf.limit(length);
+ return buf;
+ }
+
+ @Override
+ public long size() {
+ return length;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-encoding/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
index 1670f9c..6155565 100644
--- a/parquet-encoding/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
+++ b/parquet-encoding/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
@@ -21,16 +21,17 @@ package org.apache.parquet.bytes;
import static java.lang.Math.max;
import static java.lang.Math.pow;
import static java.lang.String.format;
-import static java.lang.System.arraycopy;
import static org.apache.parquet.Preconditions.checkArgument;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.parquet.Log;
+import org.apache.parquet.OutputStreamCloseException;
/**
* Similar to a {@link ByteArrayOutputStream}, but uses a different strategy for growing that does not involve copying.
@@ -54,16 +55,17 @@ import org.apache.parquet.Log;
*/
public class CapacityByteArrayOutputStream extends OutputStream {
private static final Log LOG = Log.getLog(CapacityByteArrayOutputStream.class);
- private static final byte[] EMPTY_SLAB = new byte[0];
+ private static final ByteBuffer EMPTY_SLAB = ByteBuffer.wrap(new byte[0]);
private int initialSlabSize;
private final int maxCapacityHint;
- private final List<byte[]> slabs = new ArrayList<byte[]>();
+ private final List<ByteBuffer> slabs = new ArrayList<ByteBuffer>();
- private byte[] currentSlab;
+ private ByteBuffer currentSlab;
private int currentSlabIndex;
private int bytesAllocated = 0;
private int bytesUsed = 0;
+ private ByteBufferAllocator allocator;
/**
* Return an initial slab size such that a CapacityByteArrayOutputStream constructed with it
@@ -86,38 +88,64 @@ public class CapacityByteArrayOutputStream extends OutputStream {
return max(minSlabSize, ((int) (targetCapacity / pow(2, targetNumSlabs))));
}
+ public static CapacityByteArrayOutputStream withTargetNumSlabs(
+ int minSlabSize, int maxCapacityHint, int targetNumSlabs) {
+ return withTargetNumSlabs(minSlabSize, maxCapacityHint, targetNumSlabs, new HeapByteBufferAllocator());
+ }
+
/**
* Construct a CapacityByteArrayOutputStream configured such that its initial slab size is
* determined by {@link #initialSlabSizeHeuristic}, with targetCapacity == maxCapacityHint
*/
public static CapacityByteArrayOutputStream withTargetNumSlabs(
- int minSlabSize, int maxCapacityHint, int targetNumSlabs) {
+ int minSlabSize, int maxCapacityHint, int targetNumSlabs, ByteBufferAllocator allocator) {
return new CapacityByteArrayOutputStream(
initialSlabSizeHeuristic(minSlabSize, maxCapacityHint, targetNumSlabs),
- maxCapacityHint);
+ maxCapacityHint, allocator);
}
/**
* Defaults maxCapacityHint to 1MB
* @param initialSlabSize
- * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int)}
+ * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, ByteBufferAllocator)}
*/
@Deprecated
public CapacityByteArrayOutputStream(int initialSlabSize) {
- this(initialSlabSize, 1024 * 1024);
+ this(initialSlabSize, 1024 * 1024, new HeapByteBufferAllocator());
+ }
+
+ /**
+ * Defaults maxCapacityHint to 1MB
+ * @param initialSlabSize
+ * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, ByteBufferAllocator)}
+ */
+ @Deprecated
+ public CapacityByteArrayOutputStream(int initialSlabSize, ByteBufferAllocator allocator) {
+ this(initialSlabSize, 1024 * 1024, allocator);
}
/**
* @param initialSlabSize the size to make the first slab
* @param maxCapacityHint a hint (not guarantee) of the max amount of data written to this stream
+ * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, ByteBufferAllocator)}
*/
+ @Deprecated
public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint) {
+ this(initialSlabSize, maxCapacityHint, new HeapByteBufferAllocator());
+ }
+
+ /**
+ * @param initialSlabSize the size to make the first slab
+ * @param maxCapacityHint a hint (not guarantee) of the max amount of data written to this stream
+ */
+ public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint, ByteBufferAllocator allocator) {
checkArgument(initialSlabSize > 0, "initialSlabSize must be > 0");
checkArgument(maxCapacityHint > 0, "maxCapacityHint must be > 0");
checkArgument(maxCapacityHint >= initialSlabSize, String.format("maxCapacityHint can't be less than initialSlabSize %d %d", initialSlabSize, maxCapacityHint));
this.initialSlabSize = initialSlabSize;
this.maxCapacityHint = maxCapacityHint;
+ this.allocator = allocator;
reset();
}
@@ -145,7 +173,7 @@ public class CapacityByteArrayOutputStream extends OutputStream {
if (Log.DEBUG) LOG.debug(format("used %d slabs, adding new slab of size %d", slabs.size(), nextSlabSize));
- this.currentSlab = new byte[nextSlabSize];
+ this.currentSlab = allocator.allocate(nextSlabSize);
this.slabs.add(currentSlab);
this.bytesAllocated += nextSlabSize;
this.currentSlabIndex = 0;
@@ -153,11 +181,12 @@ public class CapacityByteArrayOutputStream extends OutputStream {
@Override
public void write(int b) {
- if (currentSlabIndex == currentSlab.length) {
+ if (!currentSlab.hasRemaining()) {
addSlab(1);
}
- currentSlab[currentSlabIndex] = (byte) b;
+ currentSlab.put(currentSlabIndex, (byte) b);
currentSlabIndex += 1;
+ currentSlab.position(currentSlabIndex);
bytesUsed += 1;
}
@@ -168,18 +197,34 @@ public class CapacityByteArrayOutputStream extends OutputStream {
throw new IndexOutOfBoundsException(
String.format("Given byte array of size %d, with requested length(%d) and offset(%d)", b.length, len, off));
}
- if (currentSlabIndex + len >= currentSlab.length) {
- final int length1 = currentSlab.length - currentSlabIndex;
- arraycopy(b, off, currentSlab, currentSlabIndex, length1);
+ if (len >= currentSlab.remaining()) {
+ final int length1 = currentSlab.remaining();
+ currentSlab.put(b, off, length1);
+ bytesUsed += length1;
+ currentSlabIndex += length1;
final int length2 = len - length1;
addSlab(length2);
- arraycopy(b, off + length1, currentSlab, currentSlabIndex, length2);
+ currentSlab.put(b, off + length1, length2);
currentSlabIndex = length2;
+ bytesUsed += length2;
} else {
- arraycopy(b, off, currentSlab, currentSlabIndex, len);
+ currentSlab.put(b, off, len);
currentSlabIndex += len;
+ bytesUsed += len;
+ }
+ }
+
+ private void writeToOutput(OutputStream out, ByteBuffer buf, int len) throws IOException {
+ if (buf.hasArray()) {
+ out.write(buf.array(), buf.arrayOffset(), len);
+ } else {
+ // The OutputStream interface only takes a byte[], unfortunately this means that a ByteBuffer
+ // not backed by a byte array must be copied to fulfil this interface
+ byte[] copy = new byte[len];
+ buf.flip();
+ buf.get(copy);
+ out.write(copy);
}
- bytesUsed += len;
}
/**
@@ -191,10 +236,9 @@ public class CapacityByteArrayOutputStream extends OutputStream {
*/
public void writeTo(OutputStream out) throws IOException {
for (int i = 0; i < slabs.size() - 1; i++) {
- final byte[] slab = slabs.get(i);
- out.write(slab);
+ writeToOutput(out, slabs.get(i), slabs.get(i).position());
}
- out.write(currentSlab, 0, currentSlabIndex);
+ writeToOutput(out, currentSlab, currentSlabIndex);
}
/**
@@ -222,6 +266,9 @@ public class CapacityByteArrayOutputStream extends OutputStream {
// 7 = 2^3 - 1 so that doubling the initial size 3 times will get to the same size
this.initialSlabSize = max(bytesUsed / 7, initialSlabSize);
if (Log.DEBUG) LOG.debug(String.format("initial slab of size %d", initialSlabSize));
+ for (ByteBuffer slab : slabs) {
+ allocator.release(slab);
+ }
this.slabs.clear();
this.bytesAllocated = 0;
this.bytesUsed = 0;
@@ -249,13 +296,13 @@ public class CapacityByteArrayOutputStream extends OutputStream {
long seen = 0;
for (int i = 0; i < slabs.size(); i++) {
- byte[] slab = slabs.get(i);
- if (index < seen + slab.length) {
+ ByteBuffer slab = slabs.get(i);
+ if (index < seen + slab.limit()) {
// ok found index
- slab[(int)(index-seen)] = value;
+ slab.put((int)(index-seen), value);
break;
}
- seen += slab.length;
+ seen += slab.limit();
}
}
@@ -273,4 +320,16 @@ public class CapacityByteArrayOutputStream extends OutputStream {
int getSlabCount() {
return slabs.size();
}
+
+ @Override
+ public void close() {
+ for (ByteBuffer slab : slabs) {
+ allocator.release(slab);
+ }
+ try {
+ super.close();
+ }catch(IOException e){
+ throw new OutputStreamCloseException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java b/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
index da4e92f..9d4a8a9 100644
--- a/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
+++ b/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
@@ -18,6 +18,9 @@
*/
package org.apache.parquet.bytes;
+import org.apache.parquet.IOExceptionUtils;
+import org.apache.parquet.ParquetRuntimeException;
+
import java.io.IOException;
import java.io.OutputStream;
@@ -210,4 +213,8 @@ public class LittleEndianDataOutputStream extends OutputStream {
writeLong(Double.doubleToLongBits(v));
}
+ public void close() {
+ IOExceptionUtils.closeQuietly(out);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePacker.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePacker.java b/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePacker.java
index b9a37ad..675576c 100644
--- a/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePacker.java
+++ b/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePacker.java
@@ -18,6 +18,8 @@
*/
package org.apache.parquet.column.values.bitpacking;
+import java.nio.ByteBuffer;
+
/**
* Packs and unpacks into bytes
*
@@ -71,7 +73,15 @@ public abstract class BytePacker {
* @param output the output values
* @param outPos where to write to in output
*/
- public abstract void unpack8Values(final byte[] input, final int inPos, final int[] output, final int outPos);
+ public abstract void unpack8Values(final ByteBuffer input, final int inPos, final int[] output, final int outPos);
+
+ /**
+ * Compatibility API
+ */
+ @Deprecated
+ public void unpack8Values(final byte[] input, final int inPos, final int[] output, final int outPos) {
+ unpack8Values(ByteBuffer.wrap(input), inPos, output, outPos);
+ }
/**
* unpack bitWidth * 4 bytes from input at inPos into 32 values in output at outPos.
@@ -81,6 +91,13 @@ public abstract class BytePacker {
* @param output the output values
* @param outPos where to write to in output
*/
- public abstract void unpack32Values(byte[] input, int inPos, int[] output, int outPos);
+ public abstract void unpack32Values(ByteBuffer input, int inPos, int[] output, int outPos);
+ /**
+ * Compatibility API
+ */
+ @Deprecated
+ public void unpack32Values(byte[] input, int inPos, int[] output, int outPos) {
+ unpack32Values(ByteBuffer.wrap(input), inPos, output, outPos);
+ }
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestBytesInput.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestBytesInput.java b/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestBytesInput.java
new file mode 100644
index 0000000..1cb0304
--- /dev/null
+++ b/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestBytesInput.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.bytes;
+
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestBytesInput {
+
+ @Test
+ public void testWriteInt() throws Throwable {
+ int[] testVals = {
+ Integer.MIN_VALUE,
+ Integer.MAX_VALUE,
+ 0, 100, 1000, 0xdaedbeef};
+ for (Integer testVal : testVals) {
+ BytesInput varInt = BytesInput.fromUnsignedVarInt(testVal);
+ byte[] rno = varInt.toByteArray();
+ int i = BytesUtils.readUnsignedVarInt(new ByteArrayInputStream(rno));
+ assertEquals((int) testVal, i);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java b/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java
index b80fe40..89db198 100644
--- a/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java
+++ b/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java
@@ -63,7 +63,7 @@ public class TestCapacityByteArrayOutputStream {
}
protected CapacityByteArrayOutputStream newCapacityBAOS(int initialSize) {
- return new CapacityByteArrayOutputStream(10, 1000000);
+ return new CapacityByteArrayOutputStream(initialSize, 1000000, new HeapByteBufferAllocator());
}
@Test
@@ -129,12 +129,12 @@ public class TestCapacityByteArrayOutputStream {
assertEquals(i % (v * 3), byteArray[i]);
}
// verifying we have not created 500 * 23 / 10 slabs
- assertTrue("slab count: " + capacityByteArrayOutputStream.getSlabCount(),capacityByteArrayOutputStream.getSlabCount() <= 20);
+ assertTrue("slab count: " + capacityByteArrayOutputStream.getSlabCount(), capacityByteArrayOutputStream.getSlabCount() <= 20);
capacityByteArrayOutputStream.reset();
writeArraysOf3(capacityByteArrayOutputStream, v);
validate(capacityByteArrayOutputStream, v * 3);
// verifying we use less slabs now
- assertTrue("slab count: " + capacityByteArrayOutputStream.getSlabCount(),capacityByteArrayOutputStream.getSlabCount() <= 2);
+ assertTrue("slab count: " + capacityByteArrayOutputStream.getSlabCount(), capacityByteArrayOutputStream.getSlabCount() <= 2);
}
@Test
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java b/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java
index a5ce37e..8df5f39 100644
--- a/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java
+++ b/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java
@@ -21,6 +21,7 @@ package org.apache.parquet.column.values.bitpacking;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.junit.Assert;
import org.junit.Test;
@@ -50,7 +51,7 @@ public class TestByteBitPacking {
byte[] packed = new byte[packer.getBitWidth() * 4];
packer.pack32Values(values, 0, packed, 0);
LOG.debug("packed: " + TestBitPacking.toString(packed));
- packer.unpack32Values(packed, 0, unpacked, 0);
+ packer.unpack32Values(ByteBuffer.wrap(packed), 0, unpacked, 0);
}
private int[] generateValues(int bitWidth) {
@@ -140,7 +141,7 @@ public class TestByteBitPacking {
LOG.debug("Gener. out: " + TestBitPacking.toString(packedGenerated));
Assert.assertEquals(pack.name() + " width " + i, TestBitPacking.toString(packedByLemireAsBytes), TestBitPacking.toString(packedGenerated));
- bytePacker.unpack32Values(packedByLemireAsBytes, 0, unpacked, 0);
+ bytePacker.unpack32Values(ByteBuffer.wrap(packedByLemireAsBytes), 0, unpacked, 0);
LOG.debug("Output: " + TestBitPacking.toString(unpacked));
Assert.assertArrayEquals("width " + i, values, unpacked);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestLemireBitPacking.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestLemireBitPacking.java b/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestLemireBitPacking.java
index e0c97e0..2c5fa58 100644
--- a/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestLemireBitPacking.java
+++ b/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestLemireBitPacking.java
@@ -21,6 +21,7 @@ package org.apache.parquet.column.values.bitpacking;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.junit.Assert;
import org.junit.Test;
@@ -64,7 +65,7 @@ public class TestLemireBitPacking {
private void packUnpack(BytePacker packer, int[] values, int[] unpacked) {
byte[] packed = new byte[packer.getBitWidth() * 4];
packer.pack32Values(values, 0, packed, 0);
- packer.unpack32Values(packed, 0, unpacked, 0);
+ packer.unpack32Values(ByteBuffer.wrap(packed), 0, unpacked, 0);
}
private int[] generateValues(int bitWidth) {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java
----------------------------------------------------------------------
diff --git a/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java b/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java
index 9a7c562..3d182e2 100644
--- a/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java
+++ b/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java
@@ -50,6 +50,7 @@ public class ByteBasedBitPackingGenerator {
}
FileWriter fw = new FileWriter(file);
fw.append("package org.apache.parquet.column.values.bitpacking;\n");
+ fw.append("import java.nio.ByteBuffer;\n");
fw.append("\n");
fw.append("/**\n");
if (msbFirst) {
@@ -97,8 +98,10 @@ public class ByteBasedBitPackingGenerator {
generatePack(fw, bitWidth, 4, msbFirst);
// Unpacking
- generateUnpack(fw, bitWidth, 1, msbFirst);
- generateUnpack(fw, bitWidth, 4, msbFirst);
+ generateUnpack(fw, bitWidth, 1, msbFirst, true);
+ generateUnpack(fw, bitWidth, 1, msbFirst, false);
+ generateUnpack(fw, bitWidth, 4, msbFirst, true);
+ generateUnpack(fw, bitWidth, 4, msbFirst, false);
fw.append(" }\n");
}
@@ -203,9 +206,15 @@ public class ByteBasedBitPackingGenerator {
fw.append(" }\n");
}
- private static void generateUnpack(FileWriter fw, int bitWidth, int batch, boolean msbFirst)
+ private static void generateUnpack(FileWriter fw, int bitWidth, int batch, boolean msbFirst, boolean useByteArray)
throws IOException {
- fw.append(" public final void unpack" + (batch * 8) + "Values(final byte[] in, final int inPos, final int[] out, final int outPos) {\n");
+ final String bufferDataType;
+ if (useByteArray) {
+ bufferDataType = "byte[]";
+ } else {
+ bufferDataType = "ByteBuffer";
+ }
+ fw.append(" public final void unpack" + (batch * 8) + "Values(final " + bufferDataType + " in, final int inPos, final int[] out, final int outPos) {\n");
if (bitWidth > 0) {
int mask = genMask(bitWidth);
for (int valueIndex = 0; valueIndex < (batch * 8); ++valueIndex) {
@@ -228,7 +237,14 @@ public class ByteBasedBitPackingGenerator {
} else if (shift > 0){
shiftString = "<< " + shift;
}
- fw.append(" (((((int)in[" + align(byteIndex, 2) + " + inPos]) & 255) " + shiftString + ") & " + mask + ")");
+ final String byteAccess;
+ if (useByteArray) {
+ byteAccess = "in[" + align(byteIndex, 2) + " + inPos]";
+ } else {
+ // use ByteBuffer#get(index) method
+ byteAccess = "in.get(" + align(byteIndex, 2) + " + inPos)";
+ }
+ fw.append(" (((((int)" + byteAccess + ") & 255) " + shiftString + ") & " + mask + ")");
}
fw.append(";\n");
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml
index a7f9d2c..2f2e932 100644
--- a/parquet-hadoop/pom.xml
+++ b/parquet-hadoop/pom.xml
@@ -92,6 +92,11 @@
<version>1.9.5</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>commons-pool</groupId>
+ <artifactId>commons-pool</artifactId>
+ <version>1.5.4</version>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index fdeb2ba..6821bbf 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -36,6 +36,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.parquet.CorruptStatistics;
import org.apache.parquet.Log;
import org.apache.parquet.hadoop.metadata.ColumnPath;
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
index 6840950..8bf882f 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,14 +18,14 @@
*/
package org.apache.parquet.hadoop;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -34,18 +34,64 @@ import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-class CodecFactory {
+public class CodecFactory {
+
+ protected static final Map<String, CompressionCodec> CODEC_BY_NAME = Collections
+ .synchronizedMap(new HashMap<String, CompressionCodec>());
+
+ private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>();
+ private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>();
- public class BytesDecompressor {
+ protected final Configuration configuration;
+ protected final int pageSize;
+
+ /**
+ * Create a new codec factory.
+ *
+ * @param configuration used to pass compression codec configuration information
+ * @param pageSize the expected page size, does not set a hard limit, currently just
+ * used to set the initial size of the output stream used when
+ * compressing a buffer. If this factory is only used to construct
+ * decompressors this parameter has no impact on the function of the factory
+ */
+ public CodecFactory(Configuration configuration, int pageSize) {
+ this.configuration = configuration;
+ this.pageSize = pageSize;
+ }
+
+ /**
+ * Create a codec factory that will provide compressors and decompressors
+ * that will work natively with ByteBuffers backed by direct memory.
+ *
+ * @param config configuration options for different compression codecs
+ * @param allocator an allocator for creating result buffers during compression
+ * and decompression, must provide buffers backed by Direct
+ * memory and return true for the isDirect() method
+ * on the ByteBufferAllocator interface
+ * @param pageSize the default page size. This does not set a hard limit on the
+ * size of buffers that can be compressed, but performance may
+ * be improved by setting it close to the expected size of buffers
+ * (in the case of parquet, pages) that will be compressed. This
+ * setting is unused in the case of decompressing data, as parquet
+ * always records the uncompressed size of a buffer. If this
+ * CodecFactory is only going to be used for decompressors, this
+ * parameter will not impact the function of the factory.
+ */
+ public static CodecFactory createDirectCodecFactory(Configuration config, ByteBufferAllocator allocator, int pageSize) {
+ return new DirectCodecFactory(config, allocator, pageSize);
+ }
+
+ class HeapBytesDecompressor extends BytesDecompressor {
private final CompressionCodec codec;
private final Decompressor decompressor;
- public BytesDecompressor(CompressionCodec codec) {
- this.codec = codec;
+ HeapBytesDecompressor(CompressionCodecName codecName) {
+ this.codec = getCodec(codecName);
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
} else {
@@ -53,11 +99,12 @@ class CodecFactory {
}
}
+ @Override
public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
final BytesInput decompressed;
if (codec != null) {
decompressor.reset();
- InputStream is = codec.createInputStream(new ByteArrayInputStream(bytes.toByteArray()), decompressor);
+ InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
decompressed = BytesInput.from(is, uncompressedSize);
} else {
decompressed = bytes;
@@ -65,7 +112,13 @@ class CodecFactory {
return decompressed;
}
- private void release() {
+ @Override
+ public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) throws IOException {
+ ByteBuffer decompressed = decompress(BytesInput.from(input, 0, input.remaining()), uncompressedSize).toByteBuffer();
+ output.put(decompressed);
+ }
+
+ protected void release() {
if (decompressor != null) {
CodecPool.returnDecompressor(decompressor);
}
@@ -78,16 +131,16 @@ class CodecFactory {
* @author Julien Le Dem
*
*/
- public static class BytesCompressor {
+ class HeapBytesCompressor extends BytesCompressor {
private final CompressionCodec codec;
private final Compressor compressor;
private final ByteArrayOutputStream compressedOutBuffer;
private final CompressionCodecName codecName;
- public BytesCompressor(CompressionCodecName codecName, CompressionCodec codec, int pageSize) {
+ HeapBytesCompressor(CompressionCodecName codecName) {
this.codecName = codecName;
- this.codec = codec;
+ this.codec = getCodec(codecName);
if (codec != null) {
this.compressor = CodecPool.getCompressor(codec);
this.compressedOutBuffer = new ByteArrayOutputStream(pageSize);
@@ -97,6 +150,7 @@ class CodecFactory {
}
}
+ @Override
public BytesInput compress(BytesInput bytes) throws IOException {
final BytesInput compressedBytes;
if (codec == null) {
@@ -116,7 +170,8 @@ class CodecFactory {
return compressedBytes;
}
- private void release() {
+ @Override
+ protected void release() {
if (compressor != null) {
CodecPool.returnCompressor(compressor);
}
@@ -128,60 +183,58 @@ class CodecFactory {
}
- private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>();
- private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>();
- private final Map<String, CompressionCodec> codecByName = new HashMap<String, CompressionCodec>();
- private final Configuration configuration;
+ public BytesCompressor getCompressor(CompressionCodecName codecName) {
+ BytesCompressor comp = compressors.get(codecName);
+ if (comp == null) {
+ comp = createCompressor(codecName);
+ compressors.put(codecName, comp);
+ }
+ return comp;
+ }
- public CodecFactory(Configuration configuration) {
- this.configuration = configuration;
+ public BytesDecompressor getDecompressor(CompressionCodecName codecName) {
+ BytesDecompressor decomp = decompressors.get(codecName);
+ if (decomp == null) {
+ decomp = createDecompressor(codecName);
+ decompressors.put(codecName, decomp);
+ }
+ return decomp;
+ }
+
+ protected BytesCompressor createCompressor(CompressionCodecName codecName) {
+ return new HeapBytesCompressor(codecName);
+ }
+
+ protected BytesDecompressor createDecompressor(CompressionCodecName codecName) {
+ return new HeapBytesDecompressor(codecName);
}
/**
*
- * @param codecName the requested codec
+ * @param codecName
+ * the requested codec
* @return the corresponding hadoop codec. null if UNCOMPRESSED
*/
- private CompressionCodec getCodec(CompressionCodecName codecName) {
+ protected CompressionCodec getCodec(CompressionCodecName codecName) {
String codecClassName = codecName.getHadoopCompressionCodecClassName();
if (codecClassName == null) {
return null;
}
- CompressionCodec codec = codecByName.get(codecClassName);
+ CompressionCodec codec = CODEC_BY_NAME.get(codecClassName);
if (codec != null) {
return codec;
}
try {
Class<?> codecClass = Class.forName(codecClassName);
- codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration);
- codecByName.put(codecClassName, codec);
+ codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, configuration);
+ CODEC_BY_NAME.put(codecClassName, codec);
return codec;
} catch (ClassNotFoundException e) {
throw new BadConfigurationException("Class " + codecClassName + " was not found", e);
}
}
- public BytesCompressor getCompressor(CompressionCodecName codecName, int pageSize) {
- BytesCompressor comp = compressors.get(codecName);
- if (comp == null) {
- CompressionCodec codec = getCodec(codecName);
- comp = new BytesCompressor(codecName, codec, pageSize);
- compressors.put(codecName, comp);
- }
- return comp;
- }
-
- public BytesDecompressor getDecompressor(CompressionCodecName codecName) {
- BytesDecompressor decomp = decompressors.get(codecName);
- if (decomp == null) {
- CompressionCodec codec = getCodec(codecName);
- decomp = new BytesDecompressor(codec);
- decompressors.put(codecName, decomp);
- }
- return decomp;
- }
-
public void release() {
for (BytesCompressor compressor : compressors.values()) {
compressor.release();
@@ -192,4 +245,16 @@ class CodecFactory {
}
decompressors.clear();
}
+
+ public static abstract class BytesCompressor {
+ public abstract BytesInput compress(BytesInput bytes) throws IOException;
+ public abstract CompressionCodecName getCodecName();
+ protected abstract void release();
+ }
+
+ public static abstract class BytesDecompressor {
+ public abstract BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException;
+ public abstract void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) throws IOException;
+ protected abstract void release();
+ }
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
index b6934c2..af06747 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY