You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2015/11/04 18:57:35 UTC
[3/4] parquet-mr git commit: PARQUET-77: ByteBuffer use in read and
write paths
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java
index eb9fdd9..86edd79 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java
@@ -36,9 +36,11 @@ import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.io.IOException;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
+import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.Log;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.BytesUtils;
@@ -92,17 +94,28 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
/* dictionary encoded values */
protected IntList encodedValues = new IntList();
+ /** indicates if this is the first page being processed */
+ protected boolean firstPage = true;
+
+ protected ByteBufferAllocator allocator;
+ /* Track the list of writers used so they can be appropriately closed when necessary
+ (currently used for off-heap memory which is not garbage collected) */
+ private List<RunLengthBitPackingHybridEncoder> encoders = new ArrayList<RunLengthBitPackingHybridEncoder>();
+
/**
* @param maxDictionaryByteSize
*/
- protected DictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
+ protected DictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage, ByteBufferAllocator allocator) {
+ this.allocator = allocator;
this.maxDictionaryByteSize = maxDictionaryByteSize;
this.encodingForDataPage = encodingForDataPage;
this.encodingForDictionaryPage = encodingForDictionaryPage;
}
- protected DictionaryPage dictPage(ValuesWriter dictionaryEncoder) {
- return new DictionaryPage(dictionaryEncoder.getBytes(), lastUsedDictionarySize, encodingForDictionaryPage);
+ protected DictionaryPage dictPage(ValuesWriter dictPageWriter) {
+ DictionaryPage ret = new DictionaryPage(dictPageWriter.getBytes(), lastUsedDictionarySize, encodingForDictionaryPage);
+ dictPageWriter.close();
+ return ret;
}
@Override
@@ -147,12 +160,12 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
int maxDicId = getDictionarySize() - 1;
if (DEBUG) LOG.debug("max dic id " + maxDicId);
int bitWidth = BytesUtils.getWidthFromMaxInt(maxDicId);
-
int initialSlabSize =
CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_INITIAL_SLAB_SIZE, maxDictionaryByteSize, 10);
RunLengthBitPackingHybridEncoder encoder =
- new RunLengthBitPackingHybridEncoder(bitWidth, initialSlabSize, maxDictionaryByteSize);
+ new RunLengthBitPackingHybridEncoder(bitWidth, initialSlabSize, maxDictionaryByteSize, this.allocator);
+ encoders.add(encoder);
IntIterator iterator = encodedValues.iterator();
try {
while (iterator.hasNext()) {
@@ -179,10 +192,20 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
@Override
public void reset() {
+ close();
encodedValues = new IntList();
}
@Override
+ public void close() {
+ encodedValues = null;
+ for (RunLengthBitPackingHybridEncoder encoder : encoders) {
+ encoder.close();
+ }
+ encoders.clear();
+ }
+
+ @Override
public void resetDictionary() {
lastUsedDictionaryByteSize = 0;
lastUsedDictionarySize = 0;
@@ -225,8 +248,8 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
/**
* @param maxDictionaryByteSize
*/
- public PlainBinaryDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
- super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+ public PlainBinaryDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage, ByteBufferAllocator allocator) {
+ super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage, allocator);
binaryDictionaryContent.defaultReturnValue(-1);
}
@@ -243,10 +266,10 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
}
@Override
- public DictionaryPage createDictionaryPage() {
+ public DictionaryPage toDictPageAndClose() {
if (lastUsedDictionarySize > 0) {
// return a dictionary only if we actually used it
- PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
+ PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator);
Iterator<Binary> binaryIterator = binaryDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
@@ -294,10 +317,9 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
/**
* @param maxDictionaryByteSize
- * @param initialSize
*/
- public PlainFixedLenArrayDictionaryValuesWriter(int maxDictionaryByteSize, int length, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
- super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+ public PlainFixedLenArrayDictionaryValuesWriter(int maxDictionaryByteSize, int length, Encoding encodingForDataPage, Encoding encodingForDictionaryPage, ByteBufferAllocator allocator) {
+ super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage, allocator);
this.length = length;
}
@@ -313,10 +335,10 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
}
@Override
- public DictionaryPage createDictionaryPage() {
+ public DictionaryPage toDictPageAndClose() {
if (lastUsedDictionarySize > 0) {
// return a dictionary only if we actually used it
- FixedLenByteArrayPlainValuesWriter dictionaryEncoder = new FixedLenByteArrayPlainValuesWriter(length, lastUsedDictionaryByteSize, maxDictionaryByteSize);
+ FixedLenByteArrayPlainValuesWriter dictionaryEncoder = new FixedLenByteArrayPlainValuesWriter(length, lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator);
Iterator<Binary> binaryIterator = binaryDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
@@ -339,10 +361,9 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
/**
* @param maxDictionaryByteSize
- * @param initialSize
*/
- public PlainLongDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
- super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+ public PlainLongDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage, ByteBufferAllocator allocator) {
+ super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage, allocator);
longDictionaryContent.defaultReturnValue(-1);
}
@@ -358,10 +379,10 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
}
@Override
- public DictionaryPage createDictionaryPage() {
+ public DictionaryPage toDictPageAndClose() {
if (lastUsedDictionarySize > 0) {
// return a dictionary only if we actually used it
- PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
+ PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator);
LongIterator longIterator = longDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
@@ -411,10 +432,9 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
/**
* @param maxDictionaryByteSize
- * @param initialSize
*/
- public PlainDoubleDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
- super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+ public PlainDoubleDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage, ByteBufferAllocator allocator) {
+ super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage, allocator);
doubleDictionaryContent.defaultReturnValue(-1);
}
@@ -430,10 +450,10 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
}
@Override
- public DictionaryPage createDictionaryPage() {
+ public DictionaryPage toDictPageAndClose() {
if (lastUsedDictionarySize > 0) {
// return a dictionary only if we actually used it
- PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
+ PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator);
DoubleIterator doubleIterator = doubleDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
@@ -483,10 +503,9 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
/**
* @param maxDictionaryByteSize
- * @param initialSize
*/
- public PlainIntegerDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
- super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+ public PlainIntegerDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage, ByteBufferAllocator allocator) {
+ super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage, allocator);
intDictionaryContent.defaultReturnValue(-1);
}
@@ -502,10 +521,10 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
}
@Override
- public DictionaryPage createDictionaryPage() {
+ public DictionaryPage toDictPageAndClose() {
if (lastUsedDictionarySize > 0) {
// return a dictionary only if we actually used it
- PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
+ PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator);
it.unimi.dsi.fastutil.ints.IntIterator intIterator = intDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
@@ -555,10 +574,9 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
/**
* @param maxDictionaryByteSize
- * @param initialSize
*/
- public PlainFloatDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
- super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+ public PlainFloatDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage, ByteBufferAllocator allocator) {
+ super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage, allocator);
floatDictionaryContent.defaultReturnValue(-1);
}
@@ -574,10 +592,10 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
}
@Override
- public DictionaryPage createDictionaryPage() {
+ public DictionaryPage toDictPageAndClose() {
if (lastUsedDictionarySize > 0) {
// return a dictionary only if we actually used it
- PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
+ PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator);
FloatIterator floatIterator = floatDictionaryContent.keySet().iterator();
// write only the part of the dict that we used
for (int i = 0; i < lastUsedDictionarySize; i++) {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
index e671310..0fa6cc6 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
@@ -23,6 +23,7 @@ import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
import static org.apache.parquet.column.Encoding.PLAIN;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.parquet.Preconditions;
import org.apache.parquet.column.Dictionary;
@@ -86,9 +87,10 @@ public abstract class PlainValuesDictionary extends Dictionary {
*/
public PlainBinaryDictionary(DictionaryPage dictionaryPage, Integer length) throws IOException {
super(dictionaryPage);
- final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+ final ByteBuffer dictionaryBytes = dictionaryPage.getBytes().toByteBuffer();
binaryDictionaryContent = new Binary[dictionaryPage.getDictionarySize()];
- int offset = 0;
+ // dictionary values are stored in order: size (4 bytes LE) followed by {size} bytes
+ int offset = dictionaryBytes.position();
if (length == null) {
// dictionary values are stored in order: size (4 bytes LE) followed by {size} bytes
for (int i = 0; i < binaryDictionaryContent.length; i++) {
@@ -96,7 +98,7 @@ public abstract class PlainValuesDictionary extends Dictionary {
// read the length
offset += 4;
// wrap the content in a binary
- binaryDictionaryContent[i] = Binary.fromConstantByteArray(dictionaryBytes, offset, len);
+ binaryDictionaryContent[i] = Binary.fromConstantByteBuffer(dictionaryBytes, offset, len);
// increment to the next value
offset += len;
}
@@ -106,7 +108,7 @@ public abstract class PlainValuesDictionary extends Dictionary {
"Invalid byte array length: " + length);
for (int i = 0; i < binaryDictionaryContent.length; i++) {
// wrap the content in a Binary
- binaryDictionaryContent[i] = Binary.fromConstantByteArray(
+ binaryDictionaryContent[i] = Binary.fromConstantByteBuffer(
dictionaryBytes, offset, length);
// increment to the next value
offset += length;
@@ -148,10 +150,10 @@ public abstract class PlainValuesDictionary extends Dictionary {
*/
public PlainLongDictionary(DictionaryPage dictionaryPage) throws IOException {
super(dictionaryPage);
- final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+ final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer();
longDictionaryContent = new long[dictionaryPage.getDictionarySize()];
LongPlainValuesReader longReader = new LongPlainValuesReader();
- longReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0);
+ longReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, dictionaryByteBuf.position());
for (int i = 0; i < longDictionaryContent.length; i++) {
longDictionaryContent[i] = longReader.readLong();
}
@@ -191,10 +193,10 @@ public abstract class PlainValuesDictionary extends Dictionary {
*/
public PlainDoubleDictionary(DictionaryPage dictionaryPage) throws IOException {
super(dictionaryPage);
- final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+ final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer();
doubleDictionaryContent = new double[dictionaryPage.getDictionarySize()];
DoublePlainValuesReader doubleReader = new DoublePlainValuesReader();
- doubleReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0);
+ doubleReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, 0);
for (int i = 0; i < doubleDictionaryContent.length; i++) {
doubleDictionaryContent[i] = doubleReader.readDouble();
}
@@ -234,10 +236,10 @@ public abstract class PlainValuesDictionary extends Dictionary {
*/
public PlainIntegerDictionary(DictionaryPage dictionaryPage) throws IOException {
super(dictionaryPage);
- final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+ final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer();
intDictionaryContent = new int[dictionaryPage.getDictionarySize()];
IntegerPlainValuesReader intReader = new IntegerPlainValuesReader();
- intReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0);
+ intReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, 0);
for (int i = 0; i < intDictionaryContent.length; i++) {
intDictionaryContent[i] = intReader.readInteger();
}
@@ -277,10 +279,10 @@ public abstract class PlainValuesDictionary extends Dictionary {
*/
public PlainFloatDictionary(DictionaryPage dictionaryPage) throws IOException {
super(dictionaryPage);
- final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+ final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer();
floatDictionaryContent = new float[dictionaryPage.getDictionarySize()];
FloatPlainValuesReader floatReader = new FloatPlainValuesReader();
- floatReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0);
+ floatReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, dictionaryByteBuf.position());
for (int i = 0; i < floatDictionaryContent.length; i++) {
floatDictionaryContent[i] = floatReader.readFloat();
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java
index f66c7c9..19fed7d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java
@@ -97,11 +97,17 @@ public class FallbackValuesWriter<I extends ValuesWriter & RequiresFallback, F e
currentWriter.reset();
}
- public DictionaryPage createDictionaryPage() {
+ @Override
+ public void close() {
+ initialWriter.close();
+ fallBackWriter.close();
+ }
+
+ public DictionaryPage toDictPageAndClose() {
if (initialUsedAndHadDictionary) {
- return initialWriter.createDictionaryPage();
+ return initialWriter.toDictPageAndClose();
} else {
- return currentWriter.createDictionaryPage();
+ return currentWriter.toDictPageAndClose();
}
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
index 4346e02..26f5e29 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
@@ -21,6 +21,7 @@ package org.apache.parquet.column.values.plain;
import static org.apache.parquet.Log.DEBUG;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.parquet.Log;
import org.apache.parquet.bytes.BytesUtils;
@@ -30,7 +31,7 @@ import org.apache.parquet.io.api.Binary;
public class BinaryPlainValuesReader extends ValuesReader {
private static final Log LOG = Log.getLog(BinaryPlainValuesReader.class);
- private byte[] in;
+ private ByteBuffer in;
private int offset;
@Override
@@ -39,7 +40,7 @@ public class BinaryPlainValuesReader extends ValuesReader {
int length = BytesUtils.readIntLittleEndian(in, offset);
int start = offset + 4;
offset = start + length;
- return Binary.fromConstantByteArray(in, start, length);
+ return Binary.fromConstantByteBuffer(in, start, length);
} catch (IOException e) {
throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
} catch (RuntimeException e) {
@@ -60,11 +61,10 @@ public class BinaryPlainValuesReader extends ValuesReader {
}
@Override
- public void initFromPage(int valueCount, byte[] in, int offset)
+ public void initFromPage(int valueCount, ByteBuffer in, int offset)
throws IOException {
- if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
+ if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset));
this.in = in;
this.offset = offset;
}
-
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
index 31e711f..c330490 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
@@ -22,6 +22,7 @@ import static org.apache.parquet.Log.DEBUG;
import static org.apache.parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.parquet.Log;
import org.apache.parquet.column.values.ValuesReader;
@@ -62,8 +63,8 @@ public class BooleanPlainValuesReader extends ValuesReader {
* @see org.apache.parquet.column.values.ValuesReader#initFromPage(byte[], int)
*/
@Override
- public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
- if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
+ public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
+ if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset));
this.in.initFromPage(valueCount, in, offset);
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java
index 78920f0..c3e88ea 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java
@@ -18,6 +18,7 @@
*/
package org.apache.parquet.column.values.plain;
+
import static org.apache.parquet.column.Encoding.PLAIN;
import static org.apache.parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN;
import org.apache.parquet.bytes.BytesInput;
@@ -61,6 +62,11 @@ public class BooleanPlainValuesWriter extends ValuesWriter {
}
@Override
+ public void close() {
+ bitPackingWriter.close();
+ }
+
+ @Override
public long getAllocatedSize() {
return bitPackingWriter.getAllocatedSize();
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
index 098a486..8496e7e 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
@@ -19,6 +19,7 @@
package org.apache.parquet.column.values.plain;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.parquet.Log;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.ParquetDecodingException;
@@ -33,7 +34,7 @@ import static org.apache.parquet.Log.DEBUG;
*/
public class FixedLenByteArrayPlainValuesReader extends ValuesReader {
private static final Log LOG = Log.getLog(FixedLenByteArrayPlainValuesReader.class);
- private byte[] in;
+ private ByteBuffer in;
private int offset;
private int length;
@@ -46,7 +47,7 @@ public class FixedLenByteArrayPlainValuesReader extends ValuesReader {
try {
int start = offset;
offset = start + length;
- return Binary.fromConstantByteArray(in, start, length);
+ return Binary.fromConstantByteBuffer(in, start, length);
} catch (RuntimeException e) {
throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
}
@@ -58,9 +59,9 @@ public class FixedLenByteArrayPlainValuesReader extends ValuesReader {
}
@Override
- public void initFromPage(int valueCount, byte[] in, int offset)
+ public void initFromPage(int valueCount, ByteBuffer in, int offset)
throws IOException {
- if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
+ if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset));
this.in = in;
this.offset = offset;
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java
index 986ae0b..6ab2dea 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java
@@ -20,6 +20,7 @@ package org.apache.parquet.column.values.plain;
import java.io.IOException;
+import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.Log;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
@@ -40,10 +41,13 @@ public class FixedLenByteArrayPlainValuesWriter extends ValuesWriter {
private CapacityByteArrayOutputStream arrayOut;
private LittleEndianDataOutputStream out;
private int length;
+ private ByteBufferAllocator allocator;
+
- public FixedLenByteArrayPlainValuesWriter(int length, int initialSize, int pageSize) {
+ public FixedLenByteArrayPlainValuesWriter(int length, int initialSize, int pageSize, ByteBufferAllocator allocator) {
this.length = length;
- this.arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize);
+ this.allocator = allocator;
+ this.arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, this.allocator);
this.out = new LittleEndianDataOutputStream(arrayOut);
}
@@ -82,6 +86,11 @@ public class FixedLenByteArrayPlainValuesWriter extends ValuesWriter {
}
@Override
+ public void close() {
+ arrayOut.close();
+ }
+
+ @Override
public long getAllocatedSize() {
return arrayOut.getCapacity();
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
index bd938ee..c8fb303 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
@@ -20,9 +20,10 @@ package org.apache.parquet.column.values.plain;
import static org.apache.parquet.Log.DEBUG;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.Log;
import org.apache.parquet.bytes.LittleEndianDataInputStream;
import org.apache.parquet.column.values.ValuesReader;
@@ -41,12 +42,16 @@ abstract public class PlainValuesReader extends ValuesReader {
/**
* {@inheritDoc}
- * @see org.apache.parquet.column.values.ValuesReader#initFromPage(byte[], int)
+ * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBuffer, int)
*/
@Override
- public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
- if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
- this.in = new LittleEndianDataInputStream(new ByteArrayInputStream(in, offset, in.length - offset));
+ public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
+ if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset));
+ this.in = new LittleEndianDataInputStream(toInputStream(in, offset));
+ }
+
+ private ByteBufferInputStream toInputStream(ByteBuffer in, int offset) {
+ return new ByteBufferInputStream(in.duplicate(), offset, in.limit() - offset);
}
public static class DoublePlainValuesReader extends PlainValuesReader {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java
index f33bd81..add5495 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java
@@ -21,6 +21,7 @@ package org.apache.parquet.column.values.plain;
import java.io.IOException;
import java.nio.charset.Charset;
+import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.Log;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
@@ -44,8 +45,8 @@ public class PlainValuesWriter extends ValuesWriter {
private CapacityByteArrayOutputStream arrayOut;
private LittleEndianDataOutputStream out;
- public PlainValuesWriter(int initialSize, int pageSize) {
- arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize);
+ public PlainValuesWriter(int initialSize, int pageSize, ByteBufferAllocator allocator) {
+ arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, allocator);
out = new LittleEndianDataOutputStream(arrayOut);
}
@@ -126,6 +127,12 @@ public class PlainValuesWriter extends ValuesWriter {
}
@Override
+ public void close() {
+ arrayOut.close();
+ out.close();
+ }
+
+ @Override
public long getAllocatedSize() {
return arrayOut.getCapacity();
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
index 38eb354..1280e8d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
@@ -20,10 +20,12 @@ package org.apache.parquet.column.values.rle;
import static org.apache.parquet.Log.DEBUG;
-import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.Log;
import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.BytesUtils;
@@ -43,14 +45,14 @@ public class RunLengthBitPackingHybridDecoder {
private final int bitWidth;
private final BytePacker packer;
- private final ByteArrayInputStream in;
+ private final InputStream in;
private MODE mode;
private int currentCount;
private int currentValue;
private int[] currentBuffer;
- public RunLengthBitPackingHybridDecoder(int bitWidth, ByteArrayInputStream in) {
+ public RunLengthBitPackingHybridDecoder(int bitWidth, InputStream in) {
if (DEBUG) LOG.debug("decoding bitWidth " + bitWidth);
Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java
index 9d37574..001d3f6 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java
@@ -20,6 +20,7 @@ package org.apache.parquet.column.values.rle;
import java.io.IOException;
+import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.Log;
import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.BytesInput;
@@ -116,7 +117,7 @@ public class RunLengthBitPackingHybridEncoder {
private boolean toBytesCalled;
- public RunLengthBitPackingHybridEncoder(int bitWidth, int initialCapacity, int pageSize) {
+ public RunLengthBitPackingHybridEncoder(int bitWidth, int initialCapacity, int pageSize, ByteBufferAllocator allocator) {
if (DEBUG) {
LOG.debug(String.format("Encoding: RunLengthBitPackingHybridEncoder with "
+ "bithWidth: %d initialCapacity %d", bitWidth, initialCapacity));
@@ -125,7 +126,7 @@ public class RunLengthBitPackingHybridEncoder {
Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
this.bitWidth = bitWidth;
- this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize);
+ this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize, allocator);
this.packBuffer = new byte[bitWidth];
this.bufferedValues = new int[8];
this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
@@ -281,6 +282,11 @@ public class RunLengthBitPackingHybridEncoder {
reset(true);
}
+ public void close() {
+ reset(false);
+ baos.close();
+ }
+
public long getBufferedSize() {
return baos.size();
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
index bd4e11d..4ccf2b8 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
@@ -18,9 +18,10 @@
*/
package org.apache.parquet.column.values.rle;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.ParquetDecodingException;
@@ -41,8 +42,8 @@ public class RunLengthBitPackingHybridValuesReader extends ValuesReader {
}
@Override
- public void initFromPage(int valueCountL, byte[] page, int offset) throws IOException {
- ByteArrayInputStream in = new ByteArrayInputStream(page, offset, page.length - offset);
+ public void initFromPage(int valueCountL, ByteBuffer page, int offset) throws IOException {
+ ByteBufferInputStream in = new ByteBufferInputStream(page, offset, page.limit() - offset);
int length = BytesUtils.readIntLittleEndian(in);
decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
index bccfd34..14ef161 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
@@ -20,6 +20,7 @@ package org.apache.parquet.column.values.rle;
import java.io.IOException;
+import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.Ints;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.Encoding;
@@ -32,8 +33,8 @@ import org.apache.parquet.io.ParquetEncodingException;
public class RunLengthBitPackingHybridValuesWriter extends ValuesWriter {
private final RunLengthBitPackingHybridEncoder encoder;
- public RunLengthBitPackingHybridValuesWriter(int bitWidth, int initialCapacity, int pageSize) {
- this.encoder = new RunLengthBitPackingHybridEncoder(bitWidth, initialCapacity, pageSize);
+ public RunLengthBitPackingHybridValuesWriter(int bitWidth, int initialCapacity, int pageSize, ByteBufferAllocator allocator) {
+ this.encoder = new RunLengthBitPackingHybridEncoder(bitWidth, initialCapacity, pageSize, allocator);
}
@Override
@@ -82,6 +83,11 @@ public class RunLengthBitPackingHybridValuesWriter extends ValuesWriter {
}
@Override
+ public void close() {
+ encoder.close();
+ }
+
+ @Override
public String memUsageString(String prefix) {
return String.format("%s RunLengthBitPackingHybrid %d bytes", prefix, getAllocatedSize());
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
index f88d740..ff833ec 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
@@ -62,12 +62,16 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
abstract boolean equals(byte[] bytes, int offset, int length);
+ abstract boolean equals(ByteBuffer bytes, int offset, int length);
+
abstract boolean equals(Binary other);
abstract public int compareTo(Binary other);
abstract int compareTo(byte[] bytes, int offset, int length);
+ abstract int compareTo(ByteBuffer bytes, int offset, int length);
+
abstract public ByteBuffer toByteBuffer();
@Override
@@ -174,6 +178,11 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
}
@Override
+ boolean equals(ByteBuffer bytes, int otherOffset, int otherLength) {
+ return Binary.equals(value, offset, length, bytes, otherOffset, otherLength);
+ }
+
+ @Override
public int compareTo(Binary other) {
return other.compareTo(value, offset, length);
}
@@ -184,6 +193,11 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
}
@Override
+ int compareTo(ByteBuffer bytes, int otherOffset, int otherLength) {
+ return Binary.compareByteArrayToByteBuffer(value, offset, length, bytes, otherOffset, otherLength);
+ }
+
+ @Override
public ByteBuffer toByteBuffer() {
return ByteBuffer.wrap(value, offset, length);
}
@@ -292,6 +306,11 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
}
@Override
+ boolean equals(ByteBuffer bytes, int otherOffset, int otherLength) {
+ return Binary.equals(value, 0, value.length, bytes, otherOffset, otherLength);
+ }
+
+ @Override
public int compareTo(Binary other) {
return other.compareTo(value, 0, value.length);
}
@@ -302,6 +321,11 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
}
@Override
+ int compareTo(ByteBuffer bytes, int otherOffset, int otherLength) {
+ return Binary.compareByteArrayToByteBuffer(value, 0, value.length, bytes, otherOffset, otherLength);
+ }
+
+ @Override
public ByteBuffer toByteBuffer() {
return ByteBuffer.wrap(value);
}
@@ -330,36 +354,58 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
}
private static class ByteBufferBackedBinary extends Binary {
- private transient ByteBuffer value;
- private transient byte[] cachedBytes;
+ private ByteBuffer value;
+ private byte[] cachedBytes;
+ private final int offset;
+ private final int length;
- public ByteBufferBackedBinary(ByteBuffer value, boolean isBackingBytesReused) {
+ public ByteBufferBackedBinary(ByteBuffer value, int offset, int length, boolean isBackingBytesReused) {
this.value = value;
+ this.offset = offset;
+ this.length = length;
this.isBackingBytesReused = isBackingBytesReused;
}
@Override
public String toStringUsingUTF8() {
- return UTF8.decode(value).toString();
+ int limit = value.limit();
+ value.limit(offset+length);
+ int position = value.position();
+ value.position(offset);
+ // no corresponding interface to read a subset of a buffer, would have to slice it
+ // which creates another ByteBuffer object or do what is done here to adjust the
+ // limit/offset and set them back after
+ String ret = UTF8.decode(value).toString();
+ value.limit(limit);
+ value.position(position);
+ return ret;
}
@Override
public int length() {
- return value.remaining();
+ return length;
}
@Override
public void writeTo(OutputStream out) throws IOException {
- // TODO: should not have to materialize those bytes
- out.write(getBytesUnsafe());
+ if (value.hasArray()) {
+ out.write(value.array(), value.arrayOffset() + offset, length);
+ } else {
+ out.write(getBytesUnsafe(), 0, length);
+ }
}
@Override
public byte[] getBytes() {
- byte[] bytes = new byte[value.remaining()];
+ byte[] bytes = new byte[length];
- value.mark();
- value.get(bytes).reset();
+ int limit = value.limit();
+ value.limit(offset + length);
+ int position = value.position();
+ value.position(offset);
+ value.get(bytes);
+ value.limit(limit);
+ value.position(position);
if (!isBackingBytesReused) { // backing buffer might change
cachedBytes = bytes;
}
@@ -375,60 +421,68 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
public Binary slice(int start, int length) {
return Binary.fromConstantByteArray(getBytesUnsafe(), start, length);
}
-
@Override
public int hashCode() {
if (value.hasArray()) {
- return Binary.hashCode(value.array(), value.arrayOffset() + value.position(),
- value.arrayOffset() + value.remaining());
+ return Binary.hashCode(value.array(), value.arrayOffset() + offset, length);
+ } else {
+ return Binary.hashCode(value, offset, length);
}
- byte[] bytes = getBytesUnsafe();
- return Binary.hashCode(bytes, 0, bytes.length);
}
@Override
boolean equals(Binary other) {
if (value.hasArray()) {
- return other.equals(value.array(), value.arrayOffset() + value.position(),
- value.arrayOffset() + value.remaining());
+ return other.equals(value.array(), value.arrayOffset() + offset, length);
+ } else {
+ return other.equals(value, offset, length);
}
- byte[] bytes = getBytesUnsafe();
- return other.equals(bytes, 0, bytes.length);
}
@Override
boolean equals(byte[] other, int otherOffset, int otherLength) {
if (value.hasArray()) {
- return Binary.equals(value.array(), value.arrayOffset() + value.position(),
- value.arrayOffset() + value.remaining(), other, otherOffset, otherLength);
+ return Binary.equals(value.array(), value.arrayOffset() + offset, length, other, otherOffset, otherLength);
+ } else {
+ return Binary.equals(other, otherOffset, otherLength, value, offset, length);
}
- byte[] bytes = getBytesUnsafe();
- return Binary.equals(bytes, 0, bytes.length, other, otherOffset, otherLength);
+ }
+
+ @Override
+ boolean equals(ByteBuffer otherBytes, int otherOffset, int otherLength) {
+ return Binary.equals(value, 0, length, otherBytes, otherOffset, otherLength);
}
@Override
public int compareTo(Binary other) {
if (value.hasArray()) {
- return other.compareTo(value.array(), value.arrayOffset() + value.position(),
- value.arrayOffset() + value.remaining());
+ return other.compareTo(value.array(), value.arrayOffset() + offset, length);
+ } else {
+ return other.compareTo(value, offset, length);
}
- byte[] bytes = getBytesUnsafe();
- return other.compareTo(bytes, 0, bytes.length);
}
@Override
int compareTo(byte[] other, int otherOffset, int otherLength) {
if (value.hasArray()) {
- return Binary.compareTwoByteArrays(value.array(), value.arrayOffset() + value.position(),
- value.arrayOffset() + value.remaining(), other, otherOffset, otherLength);
+ return Binary.compareTwoByteArrays(value.array(), value.arrayOffset() + offset, length,
+ other, otherOffset, otherLength);
+ } {
+ return Binary.compareByteBufferToByteArray(value, offset, length, other, otherOffset, otherLength);
}
- byte[] bytes = getBytesUnsafe();
- return Binary.compareTwoByteArrays(bytes, 0, bytes.length, other, otherOffset, otherLength);
+ }
+
+ @Override
+ int compareTo(ByteBuffer bytes, int otherOffset, int otherLength) {
+ return Binary.compareTwoByteBuffers(value, offset, length, bytes, otherOffset, otherLength);
}
@Override
public ByteBuffer toByteBuffer() {
- return value;
+ ByteBuffer ret = value.slice();
+ ret.position(offset);
+ ret.limit(offset + length);
+ return ret;
}
@Override
@@ -456,12 +510,20 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
}
+ public static Binary fromReusedByteBuffer(final ByteBuffer value, int offset, int length) {
+ return new ByteBufferBackedBinary(value, offset, length, true);
+ }
+
+ public static Binary fromConstantByteBuffer(final ByteBuffer value, int offset, int length) {
+ return new ByteBufferBackedBinary(value, offset, length, false);
+ }
+
public static Binary fromReusedByteBuffer(final ByteBuffer value) {
- return new ByteBufferBackedBinary(value, true);
+ return new ByteBufferBackedBinary(value, value.position(), value.remaining(), true);
}
public static Binary fromConstantByteBuffer(final ByteBuffer value) {
- return new ByteBufferBackedBinary(value, false);
+ return new ByteBufferBackedBinary(value, value.position(), value.remaining(), false);
}
@Deprecated
@@ -492,6 +554,39 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
return result;
}
+ private static final int hashCode(ByteBuffer buf, int offset, int length) {
+ int result = 1;
+ for (int i = offset; i < offset + length; i++) {
+ byte b = buf.get(i);
+ result = 31 * result + b;
+ }
+ return result;
+ }
+
+ private static final boolean equals(ByteBuffer buf1, int offset1, int length1, ByteBuffer buf2, int offset2, int length2) {
+ if (buf1 == null && buf2 == null) return true;
+ if (buf1 == null || buf2 == null) return false;
+ if (length1 != length2) return false;
+ for (int i = 0; i < length1; i++) {
+ if (buf1.get(i + offset1) != buf2.get(i + offset2)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static final boolean equals(byte[] array1, int offset1, int length1, ByteBuffer buf, int offset2, int length2) {
+ if (array1 == null && buf == null) return true;
+ if (array1 == null || buf == null) return false;
+ if (length1 != length2) return false;
+ for (int i = 0; i < length1; i++) {
+ if (array1[i + offset1] != buf.get(i + offset2)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
/**
* @see {@link Arrays#equals(byte[], byte[])}
* @param array1
@@ -515,6 +610,47 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
return true;
}
+ private static final int compareByteBufferToByteArray(ByteBuffer buf, int offset1, int length1,
+ byte[] array, int offset2, int length2) {
+ return -1 * Binary.compareByteArrayToByteBuffer(array, offset1, length1, buf, offset2, length2);
+ }
+
+ private static final int compareByteArrayToByteBuffer(byte[] array1, int offset1, int length1,
+ ByteBuffer buf, int offset2, int length2) {
+ if (array1 == null && buf == null) return 0;
+ int min_length = (length1 < length2) ? length1 : length2;
+ for (int i = 0; i < min_length; i++) {
+ if (array1[i + offset1] < buf.get(i + offset2)) {
+ return 1;
+ }
+ if (array1[i + offset1] > buf.get(i + offset2)) {
+ return -1;
+ }
+ }
+ // check remainder
+ if (length1 == length2) { return 0; }
+ else if (length1 < length2) { return 1;}
+ else { return -1; }
+ }
+
+ private static final int compareTwoByteBuffers(ByteBuffer buf1, int offset1, int length1,
+ ByteBuffer buf2, int offset2, int length2) {
+ if (buf1 == null && buf2 == null) return 0;
+ int min_length = (length1 < length2) ? length1 : length2;
+ for (int i = 0; i < min_length; i++) {
+ if (buf1.get(i + offset1) < buf2.get(i + offset2)) {
+ return 1;
+ }
+ if (buf1.get(i + offset1) > buf2.get(i + offset2)) {
+ return -1;
+ }
+ }
+ // check remainder
+ if (length1 == length2) { return 0; }
+ else if (length1 < length2) { return 1;}
+ else { return -1; }
+ }
+
private static final int compareTwoByteArrays(byte[] array1, int offset1, int length1,
byte[] array2, int offset2, int length2) {
if (array1 == null && array2 == null) return 0;
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
index 7988f4a..5c6e460 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
@@ -333,10 +333,9 @@ public final class PrimitiveType extends Type {
* @param decimalMeta (optional) metadata about the decimal type
* @param id the id of the field
*/
- PrimitiveType(
- Repetition repetition, PrimitiveTypeName primitive,
- int length, String name, OriginalType originalType,
- DecimalMetadata decimalMeta, ID id) {
+ public PrimitiveType(Repetition repetition, PrimitiveTypeName primitive,
+ int length, String name, OriginalType originalType,
+ DecimalMetadata decimalMeta, ID id) {
super(name, repetition, originalType, id);
this.primitive = primitive;
this.length = length;
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
index a1820e6..6792361 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.parquet.Version;
import org.apache.parquet.VersionParser;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.junit.Test;
import org.apache.parquet.column.ColumnDescriptor;
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java
index 0327948..9bb2759 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java
@@ -36,8 +36,10 @@ import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.PrimitiveType;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -74,16 +76,20 @@ public class TestCorruptDeltaByteArrays {
assertFalse(CorruptDeltaByteArrays.requiresSequentialReads(fixed, Encoding.DELTA_BYTE_ARRAY));
}
+ private DeltaByteArrayWriter getDeltaByteArrayWriter() {
+ return new DeltaByteArrayWriter(10, 100, new HeapByteBufferAllocator());
+ }
+
@Test
public void testReassemblyWithCorruptPage() throws Exception {
- DeltaByteArrayWriter writer = new DeltaByteArrayWriter(10, 100);
+ DeltaByteArrayWriter writer = getDeltaByteArrayWriter();
String lastValue = null;
for (int i = 0; i < 10; i += 1) {
lastValue = str(i);
writer.writeBytes(Binary.fromString(lastValue));
}
- byte[] firstPageBytes = writer.getBytes().toByteArray();
+ ByteBuffer firstPageBytes = writer.getBytes().toByteBuffer();
writer.reset(); // sets previous to new byte[0]
corruptWriter(writer, lastValue);
@@ -91,7 +97,7 @@ public class TestCorruptDeltaByteArrays {
for (int i = 10; i < 20; i += 1) {
writer.writeBytes(Binary.fromString(str(i)));
}
- byte[] corruptPageBytes = writer.getBytes().toByteArray();
+ ByteBuffer corruptPageBytes = writer.getBytes().toByteBuffer();
DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader();
firstPageReader.initFromPage(10, firstPageBytes, 0);
@@ -119,19 +125,19 @@ public class TestCorruptDeltaByteArrays {
@Test
public void testReassemblyWithoutCorruption() throws Exception {
- DeltaByteArrayWriter writer = new DeltaByteArrayWriter(10, 100);
+ DeltaByteArrayWriter writer = getDeltaByteArrayWriter();
for (int i = 0; i < 10; i += 1) {
writer.writeBytes(Binary.fromString(str(i)));
}
- byte[] firstPageBytes = writer.getBytes().toByteArray();
+ ByteBuffer firstPageBytes = writer.getBytes().toByteBuffer();
writer.reset(); // sets previous to new byte[0]
for (int i = 10; i < 20; i += 1) {
writer.writeBytes(Binary.fromString(str(i)));
}
- byte[] secondPageBytes = writer.getBytes().toByteArray();
+ ByteBuffer secondPageBytes = writer.getBytes().toByteBuffer();
DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader();
firstPageReader.initFromPage(10, firstPageBytes, 0);
@@ -150,19 +156,19 @@ public class TestCorruptDeltaByteArrays {
@Test
public void testOldReassemblyWithoutCorruption() throws Exception {
- DeltaByteArrayWriter writer = new DeltaByteArrayWriter(10, 100);
+ DeltaByteArrayWriter writer = getDeltaByteArrayWriter();
for (int i = 0; i < 10; i += 1) {
writer.writeBytes(Binary.fromString(str(i)));
}
- byte[] firstPageBytes = writer.getBytes().toByteArray();
+ ByteBuffer firstPageBytes = writer.getBytes().toByteBuffer();
writer.reset(); // sets previous to new byte[0]
for (int i = 10; i < 20; i += 1) {
writer.writeBytes(Binary.fromString(str(i)));
}
- byte[] secondPageBytes = writer.getBytes().toByteArray();
+ ByteBuffer secondPageBytes = writer.getBytes().toByteBuffer();
DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader();
firstPageReader.initFromPage(10, firstPageBytes, 0);
@@ -185,15 +191,16 @@ public class TestCorruptDeltaByteArrays {
MemPageStore pages = new MemPageStore(0);
PageWriter memWriter = pages.getPageWriter(column);
+ ParquetProperties parquetProps = new ParquetProperties(0, ParquetProperties.WriterVersion.PARQUET_1_0, false, new HeapByteBufferAllocator());
+
// get generic repetition and definition level bytes to use for pages
- ValuesWriter rdValues = ParquetProperties
- .getColumnDescriptorValuesWriter(0, 10, 100);
+ ValuesWriter rdValues = parquetProps.getColumnDescriptorValuesWriter(0, 10, 100);
for (int i = 0; i < 10; i += 1) {
rdValues.writeInteger(0);
}
// use a byte array backed BytesInput because it is reused
BytesInput rd = BytesInput.from(rdValues.getBytes().toByteArray());
- DeltaByteArrayWriter writer = new DeltaByteArrayWriter(10, 100);
+ DeltaByteArrayWriter writer = getDeltaByteArrayWriter();
String lastValue = null;
List<String> values = new ArrayList<String>();
for (int i = 0; i < 10; i += 1) {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
index 135123f..044fe2a 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
@@ -20,6 +20,7 @@ package org.apache.parquet.column.mem;
import static org.junit.Assert.assertEquals;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.junit.Test;
import org.apache.parquet.Log;
@@ -160,6 +161,6 @@ public class TestMemColumn {
}
private ColumnWriteStoreV1 newColumnWriteStoreImpl(MemPageStore memPageStore) {
- return new ColumnWriteStoreV1(memPageStore, 2048, 2048, false, WriterVersion.PARQUET_1_0);
+ return new ColumnWriteStoreV1(memPageStore, 2048, 2048, false, WriterVersion.PARQUET_1_0, new HeapByteBufferAllocator());
}
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java b/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java
index d5bfe22..ddab636 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java
@@ -109,5 +109,4 @@ public class MemPageWriter implements PageWriter {
return String.format("%s %,d bytes", prefix, memSize);
}
-
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java b/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java
index c9a62b4..8caad2b 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java
@@ -19,6 +19,7 @@
package org.apache.parquet.column.values;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.parquet.io.api.Binary;
@@ -61,7 +62,7 @@ public class Utils {
public static Binary[] readData(ValuesReader reader, byte[] data, int offset, int length)
throws IOException {
Binary[] bins = new Binary[length];
- reader.initFromPage(length, data, 0);
+ reader.initFromPage(length, ByteBuffer.wrap(data), 0);
for(int i=0; i < length; i++) {
bins[i] = reader.readBytes();
}
@@ -76,7 +77,7 @@ public class Utils {
public static int[] readInts(ValuesReader reader, byte[] data, int offset, int length)
throws IOException {
int[] ints = new int[length];
- reader.initFromPage(length, data, offset);
+ reader.initFromPage(length, ByteBuffer.wrap(data), offset);
for(int i=0; i < length; i++) {
ints[i] = reader.readInteger();
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java
index e74e787..2733b72 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java
@@ -20,6 +20,7 @@ package org.apache.parquet.column.values.bitpacking;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.bitpacking.BitPacking.BitPackingWriter;
@@ -87,7 +88,7 @@ public class BitPackingPerfTest {
System.out.print(" no gc <");
for (int k = 0; k < N; k++) {
long t2 = System.nanoTime();
- r.initFromPage(result.length, bytes, 0);
+ r.initFromPage(result.length, ByteBuffer.wrap(bytes), 0);
for (int i = 0; i < result.length; i++) {
result[i] = r.readInteger();
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java
index 2f311ec..aef259c 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java
@@ -23,9 +23,11 @@ import static org.junit.Assert.assertEquals;
import static org.apache.parquet.column.values.bitpacking.Packer.BIG_ENDIAN;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.junit.Test;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.Log;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.ValuesWriter;
@@ -172,7 +174,7 @@ public class TestBitPackingColumn {
LOG.debug("bytes: " + TestBitPacking.toString(bytes));
assertEquals(type.toString(), expected, TestBitPacking.toString(bytes));
ValuesReader r = type.getReader(bound);
- r.initFromPage(vals.length, bytes, 0);
+ r.initFromPage(vals.length, ByteBuffer.wrap(bytes), 0);
int[] result = new int[vals.length];
for (int i = 0; i < result.length; i++) {
result[i] = r.readInteger();
@@ -188,7 +190,7 @@ public class TestBitPackingColumn {
return new BitPackingValuesReader(bound);
}
public ValuesWriter getWriter(final int bound) {
- return new BitPackingValuesWriter(bound, 32*1024, 64*1024);
+ return new BitPackingValuesWriter(bound, 32*1024, 64*1024, new DirectByteBufferAllocator());
}
}
,
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/boundedint/TestBoundedColumns.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/boundedint/TestBoundedColumns.java b/parquet-column/src/test/java/org/apache/parquet/column/values/boundedint/TestBoundedColumns.java
index ba979b7..d1e43d2 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/boundedint/TestBoundedColumns.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/boundedint/TestBoundedColumns.java
@@ -23,11 +23,14 @@ import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Random;
import org.junit.Test;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
+
public class TestBoundedColumns {
private final Random r = new Random(42L);
@@ -54,7 +57,7 @@ public class TestBoundedColumns {
}
private void compareOutput(int bound, int[] ints, String[] result) throws IOException {
- BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64*1024, 64*1024);
+ BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64*1024, 64*1024, new DirectByteBufferAllocator());
for (int i : ints) {
bicw.writeInteger(i);
}
@@ -63,7 +66,7 @@ public class TestBoundedColumns {
byte[] byteArray = bicw.getBytes().toByteArray();
assertEquals(concat(result), toBinaryString(byteArray, 4));
BoundedIntValuesReader bicr = new BoundedIntValuesReader(bound);
- bicr.initFromPage(1, byteArray, 0);
+ bicr.initFromPage(1, ByteBuffer.wrap(byteArray), 0);
String expected = "";
String got = "";
for (int i : ints) {
@@ -123,7 +126,7 @@ public class TestBoundedColumns {
ByteArrayOutputStream tmp = new ByteArrayOutputStream();
int[] stream = new int[totalValuesInStream];
- BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64 * 1024, 64*1024);
+ BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64 * 1024, 64*1024, new DirectByteBufferAllocator());
int idx = 0;
for (int stripeNum = 0; stripeNum < valuesPerStripe.length; stripeNum++) {
int next = 0;
@@ -155,7 +158,7 @@ public class TestBoundedColumns {
idx = 0;
int offset = 0;
for (int stripeNum = 0; stripeNum < valuesPerStripe.length; stripeNum++) {
- bicr.initFromPage(1, input, offset);
+ bicr.initFromPage(1, ByteBuffer.wrap(input), offset);
offset = bicr.getNextOffset();
for (int i = 0; i < valuesPerStripe[stripeNum]; i++) {
int number = stream[idx++];
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java
index d428fbf..6308e47 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java
@@ -22,11 +22,13 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Random;
import org.junit.Before;
import org.junit.Test;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.io.ParquetDecodingException;
@@ -42,13 +44,13 @@ public class DeltaBinaryPackingValuesWriterTest {
public void setUp() {
blockSize = 128;
miniBlockNum = 4;
- writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 200);
+ writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 200, new DirectByteBufferAllocator());
random = new Random();
}
@Test(expected = IllegalArgumentException.class)
public void miniBlockSizeShouldBeMultipleOf8() {
- new DeltaBinaryPackingValuesWriter(1281, 4, 100, 100);
+ new DeltaBinaryPackingValuesWriter(1281, 4, 100, 100, new DirectByteBufferAllocator());
}
/* When data size is multiple of Block*/
@@ -154,7 +156,7 @@ public class DeltaBinaryPackingValuesWriterTest {
System.arraycopy(valueContent, 0, pageContent, contentOffsetInPage, valueContent.length);
//offset should be correct
- reader.initFromPage(100, pageContent, contentOffsetInPage);
+ reader.initFromPage(100, ByteBuffer.wrap(pageContent), contentOffsetInPage);
int offset= reader.getNextOffset();
assertEquals(valueContent.length + contentOffsetInPage, offset);
@@ -187,7 +189,7 @@ public class DeltaBinaryPackingValuesWriterTest {
}
writeData(data);
reader = new DeltaBinaryPackingValuesReader();
- reader.initFromPage(100, writer.getBytes().toByteArray(), 0);
+ reader.initFromPage(100, writer.getBytes().toByteBuffer(), 0);
for (int i = 0; i < data.length; i++) {
if (i % 3 == 0) {
reader.skip();
@@ -243,7 +245,7 @@ public class DeltaBinaryPackingValuesWriterTest {
+ blockFlushed * miniBlockNum //bitWidth of mini blocks
+ (5.0 * blockFlushed);//min delta for each block
assertTrue(estimatedSize >= page.length);
- reader.initFromPage(100, page, 0);
+ reader.initFromPage(100, ByteBuffer.wrap(page), 0);
for (int i = 0; i < length; i++) {
assertEquals(data[i], reader.readInteger());
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java
index dc69fcc..40f6bfc 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java
@@ -19,6 +19,7 @@
package org.apache.parquet.column.values.delta.benchmark;
import org.junit.Test;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
import java.util.Random;
@@ -77,8 +78,8 @@ public class BenchmarkIntegerOutputSize {
}
public void testRandomIntegers(IntFunc func,int bitWidth) {
- DeltaBinaryPackingValuesWriter delta=new DeltaBinaryPackingValuesWriter(blockSize,miniBlockNum, 100, 20000);
- RunLengthBitPackingHybridValuesWriter rle= new RunLengthBitPackingHybridValuesWriter(bitWidth, 100, 20000);
+ DeltaBinaryPackingValuesWriter delta=new DeltaBinaryPackingValuesWriter(blockSize,miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
+ RunLengthBitPackingHybridValuesWriter rle= new RunLengthBitPackingHybridValuesWriter(bitWidth, 100, 20000, new DirectByteBufferAllocator());
for (int i = 0; i < dataSize; i++) {
int v = func.getIntValue();
delta.writeInteger(v);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
index 24b007f..4ad5dad 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
@@ -25,6 +25,7 @@ import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
@@ -33,6 +34,7 @@ import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesReade
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Random;
@AxisRange(min = 0, max = 1)
@@ -54,8 +56,8 @@ public class BenchmarkReadingRandomIntegers {
data[i] = random.nextInt(100) - 200;
}
- ValuesWriter delta = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000);
- ValuesWriter rle = new RunLengthBitPackingHybridValuesWriter(32, 100, 20000);
+ ValuesWriter delta = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
+ ValuesWriter rle = new RunLengthBitPackingHybridValuesWriter(32, 100, 20000, new DirectByteBufferAllocator());
for (int i = 0; i < data.length; i++) {
delta.writeInteger(data[i]);
@@ -86,7 +88,7 @@ public class BenchmarkReadingRandomIntegers {
}
private void readData(ValuesReader reader, byte[] deltaBytes) throws IOException {
- reader.initFromPage(data.length, deltaBytes, 0);
+ reader.initFromPage(data.length, ByteBuffer.wrap(deltaBytes), 0);
for (int i = 0; i < data.length; i++) {
reader.readInteger();
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java
index 50c97cf..80e6533 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java
@@ -25,6 +25,7 @@ import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
@@ -50,21 +51,21 @@ public class RandomWritingBenchmarkTest extends BenchMarkTest{
@BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2)
@Test
public void writeDeltaPackingTest(){
- DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000);
+ DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
runWriteTest(writer);
}
@BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2)
@Test
public void writeRLETest(){
- ValuesWriter writer = new RunLengthBitPackingHybridValuesWriter(32, 100, 20000);
+ ValuesWriter writer = new RunLengthBitPackingHybridValuesWriter(32, 100, 20000, new DirectByteBufferAllocator());
runWriteTest(writer);
}
@BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2)
@Test
public void writeDeltaPackingTest2(){
- DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000);
+ DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
runWriteTest(writer);
}
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java
index 3141fd7..0dc7cb0 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java
@@ -23,6 +23,7 @@ import com.carrotsearch.junitbenchmarks.annotation.AxisRange;
import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
import java.util.Random;
@@ -42,7 +43,7 @@ public class SmallRangeWritingBenchmarkTest extends RandomWritingBenchmarkTest {
@BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2)
@Test
public void writeRLEWithSmallBitWidthTest(){
- ValuesWriter writer = new RunLengthBitPackingHybridValuesWriter(2, 100, 20000);
+ ValuesWriter writer = new RunLengthBitPackingHybridValuesWriter(2, 100, 20000, new DirectByteBufferAllocator());
runWriteTest(writer);
}
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java
index aaae064..d7ebee5 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.junit.Test;
import org.junit.Assert;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.column.values.Utils;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
@@ -32,11 +33,15 @@ public class TestDeltaLengthByteArray {
String[] values = { "parquet", "hadoop", "mapreduce"};
+ private DeltaLengthByteArrayValuesWriter getDeltaLengthByteArrayValuesWriter() {
+ return new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator());
+ }
+
@Test
public void testSerialization () throws IOException {
- DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024);
+ DeltaLengthByteArrayValuesWriter writer = getDeltaLengthByteArrayValuesWriter();
DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader();
-
+
Utils.writeData(writer, values);
Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), values.length);
@@ -47,7 +52,7 @@ public class TestDeltaLengthByteArray {
@Test
public void testRandomStrings() throws IOException {
- DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024);
+ DeltaLengthByteArrayValuesWriter writer = getDeltaLengthByteArrayValuesWriter();
DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader();
String[] values = Utils.getRandomStringSamples(1000, 32);
@@ -61,7 +66,7 @@ public class TestDeltaLengthByteArray {
@Test
public void testLengths() throws IOException {
- DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024);
+ DeltaLengthByteArrayValuesWriter writer = getDeltaLengthByteArrayValuesWriter();
ValuesReader reader = new DeltaBinaryPackingValuesReader();
Utils.writeData(writer, values);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java
index f5f9d76..69c5e15 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.junit.Rule;
import org.junit.Test;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.column.values.Utils;
import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader;
import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesWriter;
@@ -47,7 +48,7 @@ public class BenchmarkDeltaLengthByteArray {
@BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
@Test
public void benchmarkRandomStringsWithPlainValuesWriter() throws IOException {
- PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024);
+ PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator());
BinaryPlainValuesReader reader = new BinaryPlainValuesReader();
Utils.writeData(writer, values);
@@ -59,7 +60,7 @@ public class BenchmarkDeltaLengthByteArray {
@BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
@Test
public void benchmarkRandomStringsWithDeltaLengthByteArrayValuesWriter() throws IOException {
- DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024);
+ DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator());
DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader();
Utils.writeData(writer, values);