You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2018/02/21 17:40:13 UTC
[2/2] parquet-mr git commit: PARQUET-787: Limit read allocation size
PARQUET-787: Limit read allocation size
WIP: This update the `ParquetFileReader` to use multiple buffers when reading a row group, instead of a single humongous allocation. As a consequence, many classes needed to be updated to accept a stream backed by multiple buffers, instead of using a single buffer directly. Assuming a single contiguous buffer would require too many copies.
Author: Ryan Blue <bl...@apache.org>
Closes #390 from rdblue/PARQUET-787-limit-read-allocation-size and squashes the following commits:
4abba3e7a [Ryan Blue] PARQUET-787: Update byte buffer input streams for review comments.
e7c6c5dd2 [Ryan Blue] PARQUET-787: Fix problems from Zoltan's review.
be52b59fa [Ryan Blue] PARQUET-787: Update tests for both ByteBufferInputStreams.
b0b614748 [Ryan Blue] PARQUET-787: Update encodings to use ByteBufferInputStream.
a4fa05ac5 [Ryan Blue] Refactor ByteBufferInputStream implementations.
56b22a6a1 [Ryan Blue] Make allocation size configurable.
103ed3d86 [Ryan Blue] Add tests for ByteBufferInputStream and fix bugs.
614a2bbc8 [Ryan Blue] Limit allocation size to 8MB chunks for better garbage collection.
Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/8bbc6cb9
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/8bbc6cb9
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/8bbc6cb9
Branch: refs/heads/master
Commit: 8bbc6cb95fd9b4b9e86c924ca1e40fd555ecac1d
Parents: ad80bfe
Author: Ryan Blue <bl...@apache.org>
Authored: Wed Feb 21 09:40:07 2018 -0800
Committer: Ryan Blue <bl...@apache.org>
Committed: Wed Feb 21 09:40:07 2018 -0800
----------------------------------------------------------------------
.../parquet/column/impl/ColumnReaderImpl.java | 30 +-
.../parquet/column/values/ValuesReader.java | 36 +-
.../bitpacking/BitPackingValuesReader.java | 15 +-
.../bitpacking/ByteBitPackingValuesReader.java | 48 +-
.../delta/DeltaBinaryPackingValuesReader.java | 40 +-
.../DeltaLengthByteArrayValuesReader.java | 30 +-
.../deltastrings/DeltaByteArrayReader.java | 11 +-
.../dictionary/DictionaryValuesReader.java | 9 +-
.../dictionary/PlainValuesDictionary.java | 17 +-
.../values/plain/BinaryPlainValuesReader.java | 31 +-
.../values/plain/BooleanPlainValuesReader.java | 16 +-
.../FixedLenByteArrayPlainValuesReader.java | 29 +-
.../column/values/plain/PlainValuesReader.java | 15 +-
.../rle/RunLengthBitPackingHybridDecoder.java | 2 -
.../RunLengthBitPackingHybridValuesReader.java | 19 +-
.../values/rle/ZeroIntegerValuesReader.java | 11 +-
.../column/impl/TestCorruptDeltaByteArrays.java | 17 +-
.../org/apache/parquet/column/values/Utils.java | 21 +-
.../values/bitpacking/BitPackingPerfTest.java | 3 +-
.../values/bitpacking/TestBitPackingColumn.java | 3 +-
...BinaryPackingValuesWriterForIntegerTest.java | 17 +-
...ltaBinaryPackingValuesWriterForLongTest.java | 15 +-
.../BenchmarkReadingRandomIntegers.java | 3 +-
.../TestDeltaLengthByteArray.java | 6 +-
.../BenchmarkDeltaLengthByteArray.java | 9 +-
.../values/deltastrings/TestDeltaByteArray.java | 10 +-
.../benchmark/BenchmarkDeltaByteArray.java | 17 +-
.../values/dictionary/TestDictionary.java | 36 +-
...unLengthBitPackingHybridIntegrationTest.java | 2 +-
.../TestRunLengthBitPackingHybridEncoder.java | 2 -
.../parquet/bytes/ByteBufferInputStream.java | 86 ++-
.../org/apache/parquet/bytes/BytesInput.java | 98 ++-
.../parquet/bytes/MultiBufferInputStream.java | 382 ++++++++++++
.../parquet/bytes/SingleBufferInputStream.java | 177 ++++++
.../bytes/TestByteBufferInputStreams.java | 597 +++++++++++++++++++
.../bytes/TestMultiBufferInputStream.java | 141 +++++
.../bytes/TestSingleBufferInputStream.java | 130 ++++
.../org/apache/parquet/HadoopReadOptions.java | 9 +-
.../org/apache/parquet/ParquetReadOptions.java | 50 +-
.../org/apache/parquet/hadoop/CodecFactory.java | 2 +-
.../parquet/hadoop/DirectCodecFactory.java | 4 +-
.../parquet/hadoop/ParquetFileReader.java | 91 +--
.../parquet/hadoop/TestDirectCodecFactory.java | 6 +-
43 files changed, 1852 insertions(+), 441 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
index 931b4b1..8b47977 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
@@ -24,12 +24,11 @@ import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
import static org.apache.parquet.column.ValuesType.VALUES;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
import org.apache.parquet.CorruptDeltaByteArrays;
import org.apache.parquet.VersionParser.ParsedVersion;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
@@ -549,7 +548,7 @@ public class ColumnReaderImpl implements ColumnReader {
});
}
- private void initDataReader(Encoding dataEncoding, ByteBuffer bytes, int offset, int valueCount) {
+ private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int valueCount) {
ValuesReader previousReader = this.dataColumn;
this.currentEncoding = dataEncoding;
@@ -565,13 +564,15 @@ public class ColumnReaderImpl implements ColumnReader {
} else {
this.dataColumn = dataEncoding.getValuesReader(path, VALUES);
}
+
if (dataEncoding.usesDictionary() && converter.hasDictionarySupport()) {
bindToDictionary(dictionary);
} else {
bind(path.getType());
}
+
try {
- dataColumn.initFromPage(pageValueCount, bytes, offset);
+ dataColumn.initFromPage(pageValueCount, in);
} catch (IOException e) {
throw new ParquetDecodingException("could not read page in col " + path, e);
}
@@ -589,16 +590,15 @@ public class ColumnReaderImpl implements ColumnReader {
this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
try {
- ByteBuffer bytes = page.getBytes().toByteBuffer();
- LOG.debug("page size {} bytes and {} records", bytes.remaining(), pageValueCount);
+ BytesInput bytes = page.getBytes();
+ LOG.debug("page size {} bytes and {} records", bytes.size(), pageValueCount);
LOG.debug("reading repetition levels at 0");
- rlReader.initFromPage(pageValueCount, bytes, 0);
- int next = rlReader.getNextOffset();
- LOG.debug("reading definition levels at {}", next);
- dlReader.initFromPage(pageValueCount, bytes, next);
- next = dlReader.getNextOffset();
- LOG.debug("reading data at {}", next);
- initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount());
+ ByteBufferInputStream in = bytes.toInputStream();
+ rlReader.initFromPage(pageValueCount, in);
+ LOG.debug("reading definition levels at {}", in.position());
+ dlReader.initFromPage(pageValueCount, in);
+ LOG.debug("reading data at {}", in.position());
+ initDataReader(page.getValueEncoding(), in, page.getValueCount());
} catch (IOException e) {
throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
}
@@ -607,9 +607,9 @@ public class ColumnReaderImpl implements ColumnReader {
private void readPageV2(DataPageV2 page) {
this.repetitionLevelColumn = newRLEIterator(path.getMaxRepetitionLevel(), page.getRepetitionLevels());
this.definitionLevelColumn = newRLEIterator(path.getMaxDefinitionLevel(), page.getDefinitionLevels());
+ LOG.debug("page data size {} bytes and {} records", page.getData().size(), pageValueCount);
try {
- LOG.debug("page data size {} bytes and {} records", page.getData().size(), pageValueCount);
- initDataReader(page.getDataEncoding(), page.getData().toByteBuffer(), 0, page.getValueCount());
+ initDataReader(page.getDataEncoding(), page.getData().toInputStream(), page.getValueCount());
} catch (IOException e) {
throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
index 03aa2f8..b2ec2a5 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
@@ -20,8 +20,7 @@ package org.apache.parquet.column.values;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.io.api.Binary;
/**
@@ -40,8 +39,9 @@ public abstract class ValuesReader {
/**
* Called to initialize the column reader from a part of a page.
*
- * The underlying implementation knows how much data to read, so a length
- * is not provided.
+ * Implementations must consume all bytes from the input stream, leaving the
+ * stream ready to read the next section of data. The underlying
+ * implementation knows how much data to read, so a length is not provided.
*
* Each page may contain several sections:
* <ul>
@@ -50,36 +50,12 @@ public abstract class ValuesReader {
* <li> data column
* </ul>
*
- * This function is called with 'offset' pointing to the beginning of one of these sections,
- * and should return the offset to the section following it.
- *
* @param valueCount count of values in this page
- * @param page the array to read from containing the page data (repetition levels, definition levels, data)
- * @param offset where to start reading from in the page
+ * @param in an input stream containing the page data at the correct offset
*
* @throws IOException
*/
- public abstract void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException;
-
- /**
- * Same functionality as method of the same name that takes a ByteBuffer instead of a byte[].
- *
- * This method is only provided for backward compatibility and will be removed in a future release.
- * Please update any code using it as soon as possible.
- * @see #initFromPage(int, ByteBuffer, int)
- */
- @Deprecated
- public void initFromPage(int valueCount, byte[] page, int offset) throws IOException {
- this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
- }
-
- /**
- * Called to return offset of the next section
- * @return offset of the next section
- */
- public int getNextOffset() {
- throw new ParquetDecodingException("Unsupported: cannot get offset of the next section.");
- }
+ public abstract void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException;
/**
* usable when the encoding is dictionary based
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java
index a5608cb..bcc828b 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java
@@ -22,7 +22,6 @@ import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
import static org.apache.parquet.column.values.bitpacking.BitPacking.createBitPackingReader;
import java.io.IOException;
-import java.nio.ByteBuffer;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesUtils;
@@ -44,7 +43,6 @@ public class BitPackingValuesReader extends ValuesReader {
private ByteBufferInputStream in;
private BitPackingReader bitPackingReader;
private final int bitsPerValue;
- private int nextOffset;
/**
* @param bound the maximum value stored by this column
@@ -68,21 +66,16 @@ public class BitPackingValuesReader extends ValuesReader {
/**
* {@inheritDoc}
- * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBuffer, int)
+ * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBufferInputStream)
*/
@Override
- public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
+ public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
int effectiveBitLength = valueCount * bitsPerValue;
int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength);
LOG.debug("reading {} bytes for {} values of size {} bits.", length, valueCount, bitsPerValue);
- this.in = new ByteBufferInputStream(in, offset, length);
- this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount);
- this.nextOffset = offset + length;
- }
- @Override
- public int getNextOffset() {
- return nextOffset;
+ this.in = stream.sliceStream(length);
+ this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount);
}
@Override
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
index 7c19340..0445d25 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
@@ -19,11 +19,12 @@
package org.apache.parquet.column.values.bitpacking;
import java.io.IOException;
-import java.util.Arrays;
import java.nio.ByteBuffer;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.ParquetDecodingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,9 +37,7 @@ public class ByteBitPackingValuesReader extends ValuesReader {
private final BytePacker packer;
private final int[] decoded = new int[VALUES_AT_A_TIME];
private int decodedPosition = VALUES_AT_A_TIME - 1;
- private ByteBuffer encoded;
- private int encodedPos;
- private int nextOffset;
+ private ByteBufferInputStream in;
public ByteBitPackingValuesReader(int bound, Packer packer) {
this.bitWidth = BytesUtils.getWidthFromMaxInt(bound);
@@ -49,37 +48,38 @@ public class ByteBitPackingValuesReader extends ValuesReader {
public int readInteger() {
++ decodedPosition;
if (decodedPosition == decoded.length) {
- encoded.position(encodedPos);
- if (encodedPos + bitWidth > encoded.limit()) {
- // unpack8Values needs at least bitWidth bytes to read from,
- // We have to fill in 0 byte at the end of encoded bytes.
- byte[] tempEncode = new byte[bitWidth];
- encoded.get(tempEncode, 0, encoded.limit() - encodedPos);
- packer.unpack8Values(tempEncode, 0, decoded, 0);
- } else {
- packer.unpack8Values(encoded, encodedPos, decoded, 0);
+ try {
+ if (in.available() < bitWidth) {
+ // unpack8Values needs at least bitWidth bytes to read from,
+ // We have to fill in 0 byte at the end of encoded bytes.
+ byte[] tempEncode = new byte[bitWidth];
+ in.read(tempEncode, 0, in.available());
+ packer.unpack8Values(tempEncode, 0, decoded, 0);
+ } else {
+ ByteBuffer encoded = in.slice(bitWidth);
+ packer.unpack8Values(encoded, encoded.position(), decoded, 0);
+ }
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read packed values", e);
}
- encodedPos += bitWidth;
decodedPosition = 0;
}
return decoded[decodedPosition];
}
@Override
- public void initFromPage(int valueCount, ByteBuffer page, int offset)
+ public void initFromPage(int valueCount, ByteBufferInputStream stream)
throws IOException {
int effectiveBitLength = valueCount * bitWidth;
int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength); // ceil
- LOG.debug("reading {} bytes for {} values of size {} bits.", length, valueCount, bitWidth);
- this.encoded = page;
- this.encodedPos = offset;
+ LOG.debug("reading {} bytes for {} values of size {} bits.",
+ length, valueCount, bitWidth);
+ // work-around for null values. this will not happen for repetition or
+ // definition levels (never null), but will happen when valueCount has not
+ // been adjusted for null values in the data.
+ length = Math.min(length, stream.available());
+ this.in = stream.sliceStream(length);
this.decodedPosition = VALUES_AT_A_TIME - 1;
- this.nextOffset = offset + length;
- }
-
- @Override
- public int getNextOffset() {
- return nextOffset;
}
@Override
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
index a3355d2..bf53846 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
@@ -18,7 +18,6 @@
*/
package org.apache.parquet.column.values.delta;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
import org.apache.parquet.bytes.ByteBufferInputStream;
@@ -28,7 +27,6 @@ import org.apache.parquet.column.values.bitpacking.BytePackerForLong;
import org.apache.parquet.column.values.bitpacking.Packer;
import org.apache.parquet.io.ParquetDecodingException;
-import java.io.IOException;
import java.nio.ByteBuffer;
/**
@@ -43,7 +41,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
*/
private int valuesRead;
private long minDeltaInCurrentBlock;
- private ByteBuffer page;
+
/**
* stores the decoded values including the first value which is written to the header
*/
@@ -54,23 +52,16 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
*/
private int valuesBuffered;
private ByteBufferInputStream in;
- private int nextOffset;
private DeltaBinaryPackingConfig config;
private int[] bitWidths;
/**
- * eagerly load all the data into memory
- *
- * @param valueCount count of values in this page
- * @param page the array to read from containing the page data (repetition levels, definition levels, data)
- * @param offset where to start reading from in the page
- * @throws IOException
+ * eagerly loads all the data into memory
*/
@Override
- public void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException {
- in = new ByteBufferInputStream(page, offset, page.limit() - offset);
+ public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
+ this.in = stream;
this.config = DeltaBinaryPackingConfig.readConfig(in);
- this.page = page;
this.totalValueCount = BytesUtils.readUnsignedVarInt(in);
allocateValuesBuffer();
bitWidths = new int[config.miniBlockNumInABlock];
@@ -81,14 +72,8 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
while (valuesBuffered < totalValueCount) { //values Buffered could be more than totalValueCount, since we flush on a mini block basis
loadNewBlockToBuffer();
}
- this.nextOffset = page.limit() - in.available();
}
-
- @Override
- public int getNextOffset() {
- return nextOffset;
- }
-
+
/**
* the value buffer is allocated so that the size of it is multiple of mini block
* because when writing, data is flushed on a mini block basis
@@ -123,7 +108,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
}
}
- private void loadNewBlockToBuffer() {
+ private void loadNewBlockToBuffer() throws IOException {
try {
minDeltaInCurrentBlock = BytesUtils.readZigZagVarLong(in);
} catch (IOException e) {
@@ -152,19 +137,18 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
*
* @param packer the packer created from bitwidth of current mini block
*/
- private void unpackMiniBlock(BytePackerForLong packer) {
+ private void unpackMiniBlock(BytePackerForLong packer) throws IOException {
for (int j = 0; j < config.miniBlockSizeInValues; j += 8) {
unpack8Values(packer);
}
}
- private void unpack8Values(BytePackerForLong packer) {
- //calculate the pos because the packer api uses array not stream
- int pos = page.limit() - in.available();
- packer.unpack8Values(page, pos, valuesBuffer, valuesBuffered);
+ private void unpack8Values(BytePackerForLong packer) throws IOException {
+ // get a single buffer of 8 values. most of the time, this won't require a copy
+ // TODO: update the packer to consume from an InputStream
+ ByteBuffer buffer = in.slice(packer.getBitWidth());
+ packer.unpack8Values(buffer, buffer.position(), valuesBuffer, valuesBuffered);
this.valuesBuffered += 8;
- //sync the pos in stream
- in.skip(packer.getBitWidth());
}
private void readBitWidthsForMiniBlocks() {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
index d810ba8..e6ee1fd 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
@@ -22,8 +22,10 @@ package org.apache.parquet.column.values.deltalengthbytearray;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
+import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.api.Binary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,34 +40,38 @@ public class DeltaLengthByteArrayValuesReader extends ValuesReader {
private static final Logger LOG = LoggerFactory.getLogger(DeltaLengthByteArrayValuesReader.class);
private ValuesReader lengthReader;
- private ByteBuffer in;
- private int offset;
+ private ByteBufferInputStream in;
public DeltaLengthByteArrayValuesReader() {
this.lengthReader = new DeltaBinaryPackingValuesReader();
}
@Override
- public void initFromPage(int valueCount, ByteBuffer in, int offset)
+ public void initFromPage(int valueCount, ByteBufferInputStream stream)
throws IOException {
- LOG.debug("init from page at offset {} for length {}", offset, (in.limit() - offset));
- lengthReader.initFromPage(valueCount, in, offset);
- offset = lengthReader.getNextOffset();
- this.in = in;
- this.offset = offset;
+ LOG.debug("init from page at offset {} for length {}",
+ stream.position(), stream.available());
+ lengthReader.initFromPage(valueCount, stream);
+ this.in = stream.remainingStream();
}
@Override
public Binary readBytes() {
int length = lengthReader.readInteger();
- int start = offset;
- offset = start + length;
- return Binary.fromConstantByteBuffer(in, start, length);
+ try {
+ return Binary.fromConstantByteBuffer(in.slice(length));
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read " + length + " bytes");
+ }
}
@Override
public void skip() {
int length = lengthReader.readInteger();
- offset = offset + length;
+ try {
+ in.skipFully(length);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to skip " + length + " bytes");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
index 742b515..7a01627 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
@@ -21,6 +21,8 @@ package org.apache.parquet.column.values.deltastrings;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.values.RequiresPreviousReader;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
@@ -46,13 +48,12 @@ public class DeltaByteArrayReader extends ValuesReader implements RequiresPrevio
}
@Override
- public void initFromPage(int valueCount, ByteBuffer page, int offset)
+ public void initFromPage(int valueCount, ByteBufferInputStream stream)
throws IOException {
- prefixLengthReader.initFromPage(valueCount, page, offset);
- int next = prefixLengthReader.getNextOffset();
- suffixReader.initFromPage(valueCount, page, next);
+ prefixLengthReader.initFromPage(valueCount, stream);
+ suffixReader.initFromPage(valueCount, stream);
}
-
+
@Override
public void skip() {
// read the next value to skip so that previous is correct.
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
index 19ff47c..87edda6 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
@@ -52,11 +52,12 @@ public class DictionaryValuesReader extends ValuesReader {
}
@Override
- public void initFromPage(int valueCount, ByteBuffer page, int offset)
+ public void initFromPage(int valueCount, ByteBufferInputStream stream)
throws IOException {
- this.in = new ByteBufferInputStream(page, offset, page.limit() - offset);
- if (page.limit() - offset > 0) {
- LOG.debug("init from page at offset {} for length {}", offset, (page.limit() - offset));
+ this.in = stream.remainingStream();
+ if (in.available() > 0) {
+ LOG.debug("init from page at offset {} for length {}",
+ stream.position(), stream.available());
int bitWidth = BytesUtils.readIntLittleEndianOnOneByte(in);
LOG.debug("bit width {}", bitWidth);
decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
index 0fa6cc6..0b8beb2 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.values.plain.PlainValuesReader.DoublePlainValuesReader;
@@ -150,10 +151,10 @@ public abstract class PlainValuesDictionary extends Dictionary {
*/
public PlainLongDictionary(DictionaryPage dictionaryPage) throws IOException {
super(dictionaryPage);
- final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer();
+ ByteBufferInputStream in = dictionaryPage.getBytes().toInputStream();
longDictionaryContent = new long[dictionaryPage.getDictionarySize()];
LongPlainValuesReader longReader = new LongPlainValuesReader();
- longReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, dictionaryByteBuf.position());
+ longReader.initFromPage(dictionaryPage.getDictionarySize(), in);
for (int i = 0; i < longDictionaryContent.length; i++) {
longDictionaryContent[i] = longReader.readLong();
}
@@ -193,10 +194,10 @@ public abstract class PlainValuesDictionary extends Dictionary {
*/
public PlainDoubleDictionary(DictionaryPage dictionaryPage) throws IOException {
super(dictionaryPage);
- final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer();
+ ByteBufferInputStream in = dictionaryPage.getBytes().toInputStream();
doubleDictionaryContent = new double[dictionaryPage.getDictionarySize()];
DoublePlainValuesReader doubleReader = new DoublePlainValuesReader();
- doubleReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, 0);
+ doubleReader.initFromPage(dictionaryPage.getDictionarySize(), in);
for (int i = 0; i < doubleDictionaryContent.length; i++) {
doubleDictionaryContent[i] = doubleReader.readDouble();
}
@@ -236,10 +237,10 @@ public abstract class PlainValuesDictionary extends Dictionary {
*/
public PlainIntegerDictionary(DictionaryPage dictionaryPage) throws IOException {
super(dictionaryPage);
- final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer();
+ ByteBufferInputStream in = dictionaryPage.getBytes().toInputStream();
intDictionaryContent = new int[dictionaryPage.getDictionarySize()];
IntegerPlainValuesReader intReader = new IntegerPlainValuesReader();
- intReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, 0);
+ intReader.initFromPage(dictionaryPage.getDictionarySize(), in);
for (int i = 0; i < intDictionaryContent.length; i++) {
intDictionaryContent[i] = intReader.readInteger();
}
@@ -279,10 +280,10 @@ public abstract class PlainValuesDictionary extends Dictionary {
*/
public PlainFloatDictionary(DictionaryPage dictionaryPage) throws IOException {
super(dictionaryPage);
- final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer();
+ ByteBufferInputStream in = dictionaryPage.getBytes().toInputStream();
floatDictionaryContent = new float[dictionaryPage.getDictionarySize()];
FloatPlainValuesReader floatReader = new FloatPlainValuesReader();
- floatReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, dictionaryByteBuf.position());
+ floatReader.initFromPage(dictionaryPage.getDictionarySize(), in);
for (int i = 0; i < floatDictionaryContent.length; i++) {
floatDictionaryContent[i] = floatReader.readFloat();
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
index 82e5551..6411325 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
@@ -20,8 +20,8 @@ package org.apache.parquet.column.values.plain;
import java.io.IOException;
-import java.nio.ByteBuffer;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.ParquetDecodingException;
@@ -31,40 +31,37 @@ import org.slf4j.LoggerFactory;
public class BinaryPlainValuesReader extends ValuesReader {
private static final Logger LOG = LoggerFactory.getLogger(BinaryPlainValuesReader.class);
- private ByteBuffer in;
- private int offset;
+ private ByteBufferInputStream in;
@Override
public Binary readBytes() {
try {
- int length = BytesUtils.readIntLittleEndian(in, offset);
- int start = offset + 4;
- offset = start + length;
- return Binary.fromConstantByteBuffer(in, start, length);
+ int length = BytesUtils.readIntLittleEndian(in);
+ return Binary.fromConstantByteBuffer(in.slice(length));
} catch (IOException e) {
- throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
+ throw new ParquetDecodingException("could not read bytes at offset " + in.position(), e);
} catch (RuntimeException e) {
- throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
+ throw new ParquetDecodingException("could not read bytes at offset " + in.position(), e);
}
}
@Override
public void skip() {
try {
- int length = BytesUtils.readIntLittleEndian(in, offset);
- offset += 4 + length;
+ int length = BytesUtils.readIntLittleEndian(in);
+ in.skipFully(length);
} catch (IOException e) {
- throw new ParquetDecodingException("could not skip bytes at offset " + offset, e);
+ throw new ParquetDecodingException("could not skip bytes at offset " + in.position(), e);
} catch (RuntimeException e) {
- throw new ParquetDecodingException("could not skip bytes at offset " + offset, e);
+ throw new ParquetDecodingException("could not skip bytes at offset " + in.position(), e);
}
}
@Override
- public void initFromPage(int valueCount, ByteBuffer in, int offset)
+ public void initFromPage(int valueCount, ByteBufferInputStream stream)
throws IOException {
- LOG.debug("init from page at offset {} for length {}", offset, (in.limit() - offset));
- this.in = in;
- this.offset = offset;
+ LOG.debug("init from page at offset {} for length {}",
+ stream.position(), (stream.available() - stream.position()));
+ this.in = stream.remainingStream();
}
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
index 1f8fc2c..3296daa 100755
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
@@ -21,8 +21,8 @@ package org.apache.parquet.column.values.plain;
import static org.apache.parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN;
import java.io.IOException;
-import java.nio.ByteBuffer;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesReader;
import org.slf4j.Logger;
@@ -60,17 +60,11 @@ public class BooleanPlainValuesReader extends ValuesReader {
/**
* {@inheritDoc}
- * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int valueCount, ByteBuffer page, int offset)
+ * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBufferInputStream)
*/
@Override
- public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
- LOG.debug("init from page at offset {} for length {}", offset, (in.limit() - offset));
- this.in.initFromPage(valueCount, in, offset);
+ public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
+ LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available());
+ this.in.initFromPage(valueCount, stream);
}
-
- @Override
- public int getNextOffset() {
- return this.in.getNextOffset();
- }
-
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
index 7a14f81..7738de7 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
@@ -20,6 +20,7 @@ package org.apache.parquet.column.values.plain;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.api.Binary;
@@ -33,9 +34,9 @@ import org.slf4j.LoggerFactory;
*/
public class FixedLenByteArrayPlainValuesReader extends ValuesReader {
private static final Logger LOG = LoggerFactory.getLogger(FixedLenByteArrayPlainValuesReader.class);
- private ByteBuffer in;
- private int offset;
- private int length;
+
+ private final int length;
+ private ByteBufferInputStream in;
public FixedLenByteArrayPlainValuesReader(int length) {
this.length = length;
@@ -44,24 +45,26 @@ public class FixedLenByteArrayPlainValuesReader extends ValuesReader {
@Override
public Binary readBytes() {
try {
- int start = offset;
- offset = start + length;
- return Binary.fromConstantByteBuffer(in, start, length);
- } catch (RuntimeException e) {
- throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
+ return Binary.fromConstantByteBuffer(in.slice(length));
+ } catch (IOException | RuntimeException e) {
+ throw new ParquetDecodingException("could not read bytes at offset " + in.position(), e);
}
}
@Override
public void skip() {
- offset += length;
+ try {
+ in.skipFully(length);
+ } catch (IOException | RuntimeException e) {
+ throw new ParquetDecodingException("could not skip bytes at offset " + in.position(), e);
+ }
}
@Override
- public void initFromPage(int valueCount, ByteBuffer in, int offset)
+ public void initFromPage(int valueCount, ByteBufferInputStream stream)
throws IOException {
- LOG.debug("init from page at offset {} for length {}", offset, (in.limit() - offset));
- this.in = in;
- this.offset = offset;
+ LOG.debug("init from page at offset {} for length {}",
+ stream.position(), stream.available());
+ this.in = stream.remainingStream();
}
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
index e79cbb2..726f611 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
@@ -19,7 +19,6 @@
package org.apache.parquet.column.values.plain;
import java.io.IOException;
-import java.nio.ByteBuffer;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.LittleEndianDataInputStream;
@@ -39,18 +38,10 @@ abstract public class PlainValuesReader extends ValuesReader {
protected LittleEndianDataInputStream in;
- /**
- * {@inheritDoc}
- * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBuffer, int)
- */
@Override
- public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
- LOG.debug("init from page at offset {} for length {}", offset , (in.limit() - offset));
- this.in = new LittleEndianDataInputStream(toInputStream(in, offset));
- }
-
- private ByteBufferInputStream toInputStream(ByteBuffer in, int offset) {
- return new ByteBufferInputStream(in.duplicate(), offset, in.limit() - offset);
+ public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
+ LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available());
+ this.in = new LittleEndianDataInputStream(stream.remainingStream());
}
public static class DoublePlainValuesReader extends PlainValuesReader {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
index 6daa349..d682a98 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
@@ -22,9 +22,7 @@ package org.apache.parquet.column.values.rle;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.ByteBuffer;
-import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.bitpacking.BytePacker;
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
index 4ccf2b8..ebfa76d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
@@ -19,7 +19,6 @@
package org.apache.parquet.column.values.rle;
import java.io.IOException;
-import java.nio.ByteBuffer;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesUtils;
@@ -35,26 +34,16 @@ import org.apache.parquet.io.ParquetDecodingException;
public class RunLengthBitPackingHybridValuesReader extends ValuesReader {
private final int bitWidth;
private RunLengthBitPackingHybridDecoder decoder;
- private int nextOffset;
public RunLengthBitPackingHybridValuesReader(int bitWidth) {
this.bitWidth = bitWidth;
}
@Override
- public void initFromPage(int valueCountL, ByteBuffer page, int offset) throws IOException {
- ByteBufferInputStream in = new ByteBufferInputStream(page, offset, page.limit() - offset);
- int length = BytesUtils.readIntLittleEndian(in);
-
- decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in);
-
- // 4 is for the length which is stored as 4 bytes little endian
- this.nextOffset = offset + length + 4;
- }
-
- @Override
- public int getNextOffset() {
- return this.nextOffset;
+ public void initFromPage(int valueCountL, ByteBufferInputStream stream) throws IOException {
+ int length = BytesUtils.readIntLittleEndian(stream);
+ this.decoder = new RunLengthBitPackingHybridDecoder(
+ bitWidth, stream.sliceStream(length));
}
@Override
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java
index f8ff8d0..fe00de9 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java
@@ -21,6 +21,7 @@ package org.apache.parquet.column.values.rle;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.values.ValuesReader;
/**
@@ -30,20 +31,12 @@ import org.apache.parquet.column.values.ValuesReader;
*/
public class ZeroIntegerValuesReader extends ValuesReader {
- private int nextOffset;
-
public int readInteger() {
return 0;
}
@Override
- public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
- this.nextOffset = offset;
- }
-
- @Override
- public int getNextOffset() {
- return nextOffset;
+ public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
}
@Override
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java
index 1f39d95..5bcbb88 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java
@@ -21,6 +21,7 @@ package org.apache.parquet.column.impl;
import org.apache.parquet.CorruptDeltaByteArrays;
import org.apache.parquet.SemanticVersion;
import org.apache.parquet.VersionParser.ParsedVersion;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
@@ -100,13 +101,13 @@ public class TestCorruptDeltaByteArrays {
ByteBuffer corruptPageBytes = writer.getBytes().toByteBuffer();
DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader();
- firstPageReader.initFromPage(10, firstPageBytes, 0);
+ firstPageReader.initFromPage(10, ByteBufferInputStream.wrap(firstPageBytes));
for (int i = 0; i < 10; i += 1) {
- assertEquals(firstPageReader.readBytes().toStringUsingUTF8(), str(i));
+ assertEquals(str(i), firstPageReader.readBytes().toStringUsingUTF8());
}
DeltaByteArrayReader corruptPageReader = new DeltaByteArrayReader();
- corruptPageReader.initFromPage(10, corruptPageBytes, 0);
+ corruptPageReader.initFromPage(10, ByteBufferInputStream.wrap(corruptPageBytes));
try {
corruptPageReader.readBytes();
fail("Corrupt page did not throw an exception when read");
@@ -115,7 +116,7 @@ public class TestCorruptDeltaByteArrays {
}
DeltaByteArrayReader secondPageReader = new DeltaByteArrayReader();
- secondPageReader.initFromPage(10, corruptPageBytes, 0);
+ secondPageReader.initFromPage(10, ByteBufferInputStream.wrap(corruptPageBytes));
secondPageReader.setPreviousReader(firstPageReader);
for (int i = 10; i < 20; i += 1) {
@@ -140,13 +141,13 @@ public class TestCorruptDeltaByteArrays {
ByteBuffer secondPageBytes = writer.getBytes().toByteBuffer();
DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader();
- firstPageReader.initFromPage(10, firstPageBytes, 0);
+ firstPageReader.initFromPage(10, ByteBufferInputStream.wrap(firstPageBytes));
for (int i = 0; i < 10; i += 1) {
assertEquals(firstPageReader.readBytes().toStringUsingUTF8(), str(i));
}
DeltaByteArrayReader secondPageReader = new DeltaByteArrayReader();
- secondPageReader.initFromPage(10, secondPageBytes, 0);
+ secondPageReader.initFromPage(10, ByteBufferInputStream.wrap(secondPageBytes));
secondPageReader.setPreviousReader(firstPageReader);
for (int i = 10; i < 20; i += 1) {
@@ -171,13 +172,13 @@ public class TestCorruptDeltaByteArrays {
ByteBuffer secondPageBytes = writer.getBytes().toByteBuffer();
DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader();
- firstPageReader.initFromPage(10, firstPageBytes, 0);
+ firstPageReader.initFromPage(10, ByteBufferInputStream.wrap(firstPageBytes));
for (int i = 0; i < 10; i += 1) {
assertEquals(firstPageReader.readBytes().toStringUsingUTF8(), str(i));
}
DeltaByteArrayReader secondPageReader = new DeltaByteArrayReader();
- secondPageReader.initFromPage(10, secondPageBytes, 0);
+ secondPageReader.initFromPage(10, ByteBufferInputStream.wrap(secondPageBytes));
for (int i = 10; i < 20; i += 1) {
assertEquals(secondPageReader.readBytes().toStringUsingUTF8(), str(i));
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java b/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java
index 8caad2b..248e039 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.io.api.Binary;
/**
@@ -59,33 +60,23 @@ public class Utils {
}
}
- public static Binary[] readData(ValuesReader reader, byte[] data, int offset, int length)
+ public static Binary[] readData(ValuesReader reader, ByteBufferInputStream stream, int length)
throws IOException {
Binary[] bins = new Binary[length];
- reader.initFromPage(length, ByteBuffer.wrap(data), 0);
+ reader.initFromPage(length, stream);
for(int i=0; i < length; i++) {
bins[i] = reader.readBytes();
}
return bins;
}
-
- public static Binary[] readData(ValuesReader reader, byte[] data, int length)
- throws IOException {
- return readData(reader, data, 0, length);
- }
-
- public static int[] readInts(ValuesReader reader, byte[] data, int offset, int length)
+
+ public static int[] readInts(ValuesReader reader, ByteBufferInputStream stream, int length)
throws IOException {
int[] ints = new int[length];
- reader.initFromPage(length, ByteBuffer.wrap(data), offset);
+ reader.initFromPage(length, stream);
for(int i=0; i < length; i++) {
ints[i] = reader.readInteger();
}
return ints;
}
-
- public static int[] readInts(ValuesReader reader, byte[] data, int length)
- throws IOException {
- return readInts(reader, data, 0, length);
- }
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java
index 2733b72..656623c 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java
@@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.bitpacking.BitPacking.BitPackingWriter;
@@ -88,7 +89,7 @@ public class BitPackingPerfTest {
System.out.print(" no gc <");
for (int k = 0; k < N; k++) {
long t2 = System.nanoTime();
- r.initFromPage(result.length, ByteBuffer.wrap(bytes), 0);
+ r.initFromPage(result.length, ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes)));
for (int i = 0; i < result.length; i++) {
result[i] = r.readInteger();
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java
index d83628a..867af28 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java
@@ -25,6 +25,7 @@ import static org.apache.parquet.column.values.bitpacking.Packer.BIG_ENDIAN;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.junit.Test;
import org.apache.parquet.bytes.DirectByteBufferAllocator;
@@ -175,7 +176,7 @@ public class TestBitPackingColumn {
LOG.debug("bytes: {}", TestBitPacking.toString(bytes));
assertEquals(type.toString(), expected, TestBitPacking.toString(bytes));
ValuesReader r = type.getReader(bound);
- r.initFromPage(vals.length, ByteBuffer.wrap(bytes), 0);
+ r.initFromPage(vals.length, ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes)));
int[] result = new int[vals.length];
for (int i = 0; i < result.length; i++) {
result[i] = r.readInteger();
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java
index a3bec4a..ff4a308 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.junit.Before;
import org.junit.Test;
@@ -143,7 +144,7 @@ public class DeltaBinaryPackingValuesWriterForIntegerTest {
}
@Test
- public void shouldReturnCorrectOffsetAfterInitialization() throws IOException {
+ public void shouldConsumePageDataInInitialization() throws IOException {
int[] data = new int[2 * blockSize + 3];
for (int i = 0; i < data.length; i++) {
data[i] = i * 32;
@@ -157,12 +158,14 @@ public class DeltaBinaryPackingValuesWriterForIntegerTest {
int contentOffsetInPage = 33;
System.arraycopy(valueContent, 0, pageContent, contentOffsetInPage, valueContent.length);
- //offset should be correct
- reader.initFromPage(100, ByteBuffer.wrap(pageContent), contentOffsetInPage);
- int offset= reader.getNextOffset();
+ // offset should be correct
+ ByteBufferInputStream stream = ByteBufferInputStream.wrap(ByteBuffer.wrap(pageContent));
+ stream.skipFully(contentOffsetInPage);
+ reader.initFromPage(100, stream);
+ long offset = stream.position();
assertEquals(valueContent.length + contentOffsetInPage, offset);
- //should be able to read data correclty
+ // should be able to read data correctly
for (int i : data) {
assertEquals(i, reader.readInteger());
}
@@ -191,7 +194,7 @@ public class DeltaBinaryPackingValuesWriterForIntegerTest {
}
writeData(data);
reader = new DeltaBinaryPackingValuesReader();
- reader.initFromPage(100, writer.getBytes().toByteBuffer(), 0);
+ reader.initFromPage(100, writer.getBytes().toInputStream());
for (int i = 0; i < data.length; i++) {
if (i % 3 == 0) {
reader.skip();
@@ -247,7 +250,7 @@ public class DeltaBinaryPackingValuesWriterForIntegerTest {
+ blockFlushed * miniBlockNum //bitWidth of mini blocks
+ (5.0 * blockFlushed);//min delta for each block
assertTrue(estimatedSize >= page.length);
- reader.initFromPage(100, ByteBuffer.wrap(page), 0);
+ reader.initFromPage(100, ByteBufferInputStream.wrap(ByteBuffer.wrap(page)));
for (int i = 0; i < length; i++) {
assertEquals(data[i], reader.readInteger());
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java
index 34e1800..795a591 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.junit.Before;
import org.junit.Test;
@@ -157,12 +158,14 @@ public class DeltaBinaryPackingValuesWriterForLongTest {
int contentOffsetInPage = 33;
System.arraycopy(valueContent, 0, pageContent, contentOffsetInPage, valueContent.length);
- //offset should be correct
- reader.initFromPage(100, ByteBuffer.wrap(pageContent), contentOffsetInPage);
- int offset = reader.getNextOffset();
+ // offset should be correct
+ ByteBufferInputStream stream = ByteBufferInputStream.wrap(ByteBuffer.wrap(pageContent));
+ stream.skipFully(contentOffsetInPage);
+ reader.initFromPage(100, stream);
+ long offset = stream.position();
assertEquals(valueContent.length + contentOffsetInPage, offset);
- //should be able to read data correclty
+ // should be able to read data correctly
for (long i : data) {
assertEquals(i, reader.readLong());
}
@@ -190,7 +193,7 @@ public class DeltaBinaryPackingValuesWriterForLongTest {
}
writeData(data);
reader = new DeltaBinaryPackingValuesReader();
- reader.initFromPage(100, writer.getBytes().toByteBuffer(), 0);
+ reader.initFromPage(100, writer.getBytes().toInputStream());
for (int i = 0; i < data.length; i++) {
if (i % 3 == 0) {
reader.skip();
@@ -244,7 +247,7 @@ public class DeltaBinaryPackingValuesWriterForLongTest {
+ blockFlushed * miniBlockNum //bitWidth of mini blocks
+ (10.0 * blockFlushed);//min delta for each block
assertTrue(estimatedSize >= page.length);
- reader.initFromPage(100, page, 0);
+ reader.initFromPage(100, ByteBufferInputStream.wrap(ByteBuffer.wrap(page)));
for (int i = 0; i < length; i++) {
assertEquals(data[i], reader.readLong());
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
index 488208c..ba5d771 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.ValuesWriter;
@@ -91,7 +92,7 @@ public class BenchmarkReadingRandomIntegers {
}
private void readData(ValuesReader reader, byte[] deltaBytes) throws IOException {
- reader.initFromPage(data.length, ByteBuffer.wrap(deltaBytes), 0);
+ reader.initFromPage(data.length, ByteBufferInputStream.wrap(ByteBuffer.wrap(deltaBytes)));
for (int i = 0; i < data.length; i++) {
reader.readInteger();
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java
index d7ebee5..d214a88 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java
@@ -43,7 +43,7 @@ public class TestDeltaLengthByteArray {
DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader();
Utils.writeData(writer, values);
- Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), values.length);
+ Binary[] bin = Utils.readData(reader, writer.getBytes().toInputStream(), values.length);
for(int i =0; i< bin.length ; i++) {
Assert.assertEquals(Binary.fromString(values[i]), bin[i]);
@@ -57,7 +57,7 @@ public class TestDeltaLengthByteArray {
String[] values = Utils.getRandomStringSamples(1000, 32);
Utils.writeData(writer, values);
- Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), values.length);
+ Binary[] bin = Utils.readData(reader, writer.getBytes().toInputStream(), values.length);
for(int i =0; i< bin.length ; i++) {
Assert.assertEquals(Binary.fromString(values[i]), bin[i]);
@@ -70,7 +70,7 @@ public class TestDeltaLengthByteArray {
ValuesReader reader = new DeltaBinaryPackingValuesReader();
Utils.writeData(writer, values);
- int[] bin = Utils.readInts(reader, writer.getBytes().toByteArray(), values.length);
+ int[] bin = Utils.readInts(reader, writer.getBytes().toInputStream(), values.length);
for(int i =0; i< bin.length ; i++) {
Assert.assertEquals(values[i].length(), bin[i]);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java
index 69c5e15..08d04e6 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java
@@ -20,6 +20,7 @@ package org.apache.parquet.column.values.deltalengthbytearray.benchmark;
import java.io.IOException;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.junit.Rule;
import org.junit.Test;
@@ -52,9 +53,9 @@ public class BenchmarkDeltaLengthByteArray {
BinaryPlainValuesReader reader = new BinaryPlainValuesReader();
Utils.writeData(writer, values);
- byte [] data = writer.getBytes().toByteArray();
+ ByteBufferInputStream data = writer.getBytes().toInputStream();
Binary[] bin = Utils.readData(reader, data, values.length);
- System.out.println("size " + data.length);
+ System.out.println("size " + data.position());
}
@BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
@@ -64,9 +65,9 @@ public class BenchmarkDeltaLengthByteArray {
DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader();
Utils.writeData(writer, values);
- byte [] data = writer.getBytes().toByteArray();
+ ByteBufferInputStream data = writer.getBytes().toInputStream();
Binary[] bin = Utils.readData(reader, data, values.length);
- System.out.println("size " + data.length);
+ System.out.println("size " + data.position());
}
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java
index 4f8f40c..c13a3a2 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java
@@ -21,6 +21,7 @@ package org.apache.parquet.column.values.deltastrings;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.junit.Test;
import org.junit.Assert;
@@ -63,7 +64,7 @@ public class TestDeltaByteArray {
ValuesReader reader = new DeltaBinaryPackingValuesReader();
Utils.writeData(writer, values);
- byte[] data = writer.getBytes().toByteArray();
+ ByteBufferInputStream data = writer.getBytes().toInputStream();
int[] bin = Utils.readInts(reader, data, values.length);
// test prefix lengths
@@ -71,9 +72,8 @@ public class TestDeltaByteArray {
Assert.assertEquals(7, bin[1]);
Assert.assertEquals(7, bin[2]);
- int offset = reader.getNextOffset();
reader = new DeltaBinaryPackingValuesReader();
- bin = Utils.readInts(reader, writer.getBytes().toByteArray(), offset, values.length);
+ bin = Utils.readInts(reader, data, values.length);
// test suffix lengths
Assert.assertEquals(10, bin[0]);
Assert.assertEquals(0, bin[1]);
@@ -82,7 +82,7 @@ public class TestDeltaByteArray {
private void assertReadWrite(DeltaByteArrayWriter writer, DeltaByteArrayReader reader, String[] vals) throws Exception {
Utils.writeData(writer, vals);
- Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), vals.length);
+ Binary[] bin = Utils.readData(reader, writer.getBytes().toInputStream(), vals.length);
for(int i = 0; i< bin.length ; i++) {
Assert.assertEquals(Binary.fromString(vals[i]), bin[i]);
@@ -92,7 +92,7 @@ public class TestDeltaByteArray {
private void assertReadWriteWithSkip(DeltaByteArrayWriter writer, DeltaByteArrayReader reader, String[] vals) throws Exception {
Utils.writeData(writer, vals);
- reader.initFromPage(vals.length, writer.getBytes().toByteBuffer(), 0);
+ reader.initFromPage(vals.length, writer.getBytes().toInputStream());
for (int i = 0; i < vals.length; i += 2) {
Assert.assertEquals(Binary.fromString(vals[i]), reader.readBytes());
reader.skip();
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java
index eac4bd2..53578f0 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java
@@ -21,6 +21,7 @@ package org.apache.parquet.column.values.deltastrings.benchmark;
import java.io.IOException;
import java.util.Arrays;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.junit.Rule;
import org.junit.Test;
@@ -59,9 +60,9 @@ public class BenchmarkDeltaByteArray {
BinaryPlainValuesReader reader = new BinaryPlainValuesReader();
Utils.writeData(writer, values);
- byte [] data = writer.getBytes().toByteArray();
+ ByteBufferInputStream data = writer.getBytes().toInputStream();
Binary[] bin = Utils.readData(reader, data, values.length);
- System.out.println("size " + data.length);
+ System.out.println("size " + data.position());
}
@BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
@@ -71,9 +72,9 @@ public class BenchmarkDeltaByteArray {
DeltaByteArrayReader reader = new DeltaByteArrayReader();
Utils.writeData(writer, values);
- byte [] data = writer.getBytes().toByteArray();
+ ByteBufferInputStream data = writer.getBytes().toInputStream();
Binary[] bin = Utils.readData(reader, data, values.length);
- System.out.println("size " + data.length);
+ System.out.println("size " + data.position());
}
@BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
@@ -83,9 +84,9 @@ public class BenchmarkDeltaByteArray {
BinaryPlainValuesReader reader = new BinaryPlainValuesReader();
Utils.writeData(writer, sortedVals);
- byte [] data = writer.getBytes().toByteArray();
+ ByteBufferInputStream data = writer.getBytes().toInputStream();
Binary[] bin = Utils.readData(reader, data, values.length);
- System.out.println("size " + data.length);
+ System.out.println("size " + data.position());
}
@BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
@@ -95,8 +96,8 @@ public class BenchmarkDeltaByteArray {
DeltaByteArrayReader reader = new DeltaByteArrayReader();
Utils.writeData(writer, sortedVals);
- byte [] data = writer.getBytes().toByteArray();
+ ByteBufferInputStream data = writer.getBytes().toInputStream();
Binary[] bin = Utils.readData(reader, data, values.length);
- System.out.println("size " + data.length);
+ System.out.println("size " + data.position());
}
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
index ada1c93..cf66982 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
@@ -30,6 +30,7 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.junit.Assert;
import org.junit.Test;
@@ -118,7 +119,7 @@ public class TestDictionary {
//Fallbacked to Plain encoding, therefore use PlainValuesReader to read it back
ValuesReader reader = new BinaryPlainValuesReader();
- reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
+ reader.initFromPage(100, cw.getBytes().toInputStream());
for (long i = 0; i < 100; i++) {
assertEquals(Binary.fromString("str" + i), reader.readBytes());
@@ -204,13 +205,13 @@ public class TestDictionary {
DictionaryValuesReader cr = initDicReader(cw, PrimitiveTypeName.INT64);
- cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0);
+ cr.initFromPage(COUNT, bytes1.toInputStream());
for (long i = 0; i < COUNT; i++) {
long back = cr.readLong();
assertEquals(i % 50, back);
}
- cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0);
+ cr.initFromPage(COUNT2, bytes2.toInputStream());
for (long i = COUNT2; i > 0; i--) {
long back = cr.readLong();
assertEquals(i % 50, back);
@@ -228,7 +229,7 @@ public class TestDictionary {
}
}
- reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
+ reader.initFromPage(100, cw.getBytes().toInputStream());
for (long i = 0; i < 100; i++) {
assertEquals(i, reader.readLong());
@@ -274,13 +275,13 @@ public class TestDictionary {
final DictionaryValuesReader cr = initDicReader(cw, DOUBLE);
- cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0);
+ cr.initFromPage(COUNT, bytes1.toInputStream());
for (double i = 0; i < COUNT; i++) {
double back = cr.readDouble();
assertEquals(i % 50, back, 0.0);
}
- cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0);
+ cr.initFromPage(COUNT2, bytes2.toInputStream());
for (double i = COUNT2; i > 0; i--) {
double back = cr.readDouble();
assertEquals(i % 50, back, 0.0);
@@ -299,7 +300,7 @@ public class TestDictionary {
}
}
- reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
+ reader.initFromPage(100, cw.getBytes().toInputStream());
for (double i = 0; i < 100; i++) {
assertEquals(i, reader.readDouble(), 0.00001);
@@ -345,13 +346,13 @@ public class TestDictionary {
DictionaryValuesReader cr = initDicReader(cw, INT32);
- cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0);
+ cr.initFromPage(COUNT, bytes1.toInputStream());
for (int i = 0; i < COUNT; i++) {
int back = cr.readInteger();
assertEquals(i % 50, back);
}
- cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0);
+ cr.initFromPage(COUNT2, bytes2.toInputStream());
for (int i = COUNT2; i > 0; i--) {
int back = cr.readInteger();
assertEquals(i % 50, back);
@@ -370,7 +371,7 @@ public class TestDictionary {
}
}
- reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
+ reader.initFromPage(100, cw.getBytes().toInputStream());
for (int i = 0; i < 100; i++) {
assertEquals(i, reader.readInteger());
@@ -416,13 +417,13 @@ public class TestDictionary {
DictionaryValuesReader cr = initDicReader(cw, FLOAT);
- cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0);
+ cr.initFromPage(COUNT, bytes1.toInputStream());
for (float i = 0; i < COUNT; i++) {
float back = cr.readFloat();
assertEquals(i % 50, back, 0.0f);
}
- cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0);
+ cr.initFromPage(COUNT2, bytes2.toInputStream());
for (float i = COUNT2; i > 0; i--) {
float back = cr.readFloat();
assertEquals(i % 50, back, 0.0f);
@@ -441,7 +442,7 @@ public class TestDictionary {
}
}
- reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
+ reader.initFromPage(100, cw.getBytes().toInputStream());
for (float i = 0; i < 100; i++) {
assertEquals(i, reader.readFloat(), 0.00001);
@@ -476,8 +477,9 @@ public class TestDictionary {
// pretend there are 100 nulls. what matters is offset = bytes.length.
ByteBuffer bytes = ByteBuffer.wrap(new byte[] {0x00, 0x01, 0x02, 0x03}); // data doesn't matter
- int offset = bytes.remaining();
- reader.initFromPage(100, bytes, offset);
+ ByteBufferInputStream stream = ByteBufferInputStream.wrap(bytes);
+ stream.skipFully(stream.available());
+ reader.initFromPage(100, stream);
}
private DictionaryValuesReader initDicReader(ValuesWriter cw, PrimitiveTypeName type)
@@ -490,14 +492,14 @@ public class TestDictionary {
}
private void checkDistinct(int COUNT, BytesInput bytes, ValuesReader cr, String prefix) throws IOException {
- cr.initFromPage(COUNT, bytes.toByteBuffer(), 0);
+ cr.initFromPage(COUNT, bytes.toInputStream());
for (int i = 0; i < COUNT; i++) {
Assert.assertEquals(prefix + i, cr.readBytes().toStringUsingUTF8());
}
}
private void checkRepeated(int COUNT, BytesInput bytes, ValuesReader cr, String prefix) throws IOException {
- cr.initFromPage(COUNT, bytes.toByteBuffer(), 0);
+ cr.initFromPage(COUNT, bytes.toInputStream());
for (int i = 0; i < COUNT; i++) {
Assert.assertEquals(prefix + i % 10, cr.readBytes().toStringUsingUTF8());
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
index 712fb27..173d8fa 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
@@ -72,7 +72,7 @@ public class RunLengthBitPackingHybridIntegrationTest {
numValues += 1000;
ByteBuffer encodedBytes = encoder.toBytes().toByteBuffer();
- ByteBufferInputStream in = new ByteBufferInputStream(encodedBytes);
+ ByteBufferInputStream in = ByteBufferInputStream.wrap(encodedBytes);
RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
index 5696d7b..dd329c0 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
@@ -21,14 +21,12 @@ package org.apache.parquet.column.values.rle;
import static org.junit.Assert.assertEquals;
import java.io.ByteArrayInputStream;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.junit.Test;
-import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.bitpacking.BytePacker;
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java
index 5b3b853..fc92b6b 100644
--- a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java
@@ -16,67 +16,57 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.parquet.bytes;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
-/**
- * This ByteBufferInputStream does not consume the ByteBuffer being passed in,
- * but will create a slice of the current buffer.
- */
-public class ByteBufferInputStream extends InputStream {
-
- protected ByteBuffer byteBuf;
- protected int initPos;
- protected int count;
- public ByteBufferInputStream(ByteBuffer buffer) {
- this(buffer, buffer.position(), buffer.remaining());
- }
-
- public ByteBufferInputStream(ByteBuffer buffer, int offset, int count) {
- ByteBuffer temp = buffer.duplicate();
- temp.position(offset);
- byteBuf = temp.slice();
- byteBuf.limit(count);
- this.initPos = offset;
- this.count = count;
- }
-
- public ByteBuffer toByteBuffer() {
- return byteBuf.slice();
+public abstract class ByteBufferInputStream extends InputStream {
+
+ public static ByteBufferInputStream wrap(ByteBuffer... buffers) {
+ if (buffers.length == 1) {
+ return new SingleBufferInputStream(buffers[0]);
+ } else {
+ return new MultiBufferInputStream(Arrays.asList(buffers));
+ }
}
-
- @Override
- public int read() throws IOException {
- if (!byteBuf.hasRemaining()) {
- return -1;
+
+ public static ByteBufferInputStream wrap(List<ByteBuffer> buffers) {
+ if (buffers.size() == 1) {
+ return new SingleBufferInputStream(buffers.get(0));
+ } else {
+ return new MultiBufferInputStream(buffers);
}
- //Workaround for unsigned byte
- return byteBuf.get() & 0xFF;
}
- @Override
- public int read(byte[] bytes, int offset, int length) throws IOException {
- int count = Math.min(byteBuf.remaining(), length);
- if (count == 0) return -1;
- byteBuf.get(bytes, offset, count);
- return count;
+ public abstract long position();
+
+ public void skipFully(long n) throws IOException {
+ long skipped = skip(n);
+ if (skipped < n) {
+ throw new EOFException(
+ "Not enough bytes to skip: " + skipped + " < " + n);
+ }
}
-
- @Override
- public long skip(long n) {
- if (n > byteBuf.remaining())
- n = byteBuf.remaining();
- int pos = byteBuf.position();
- byteBuf.position((int)(pos + n));
- return n;
+
+ public abstract int read(ByteBuffer out);
+
+ public abstract ByteBuffer slice(int length) throws EOFException;
+
+ public abstract List<ByteBuffer> sliceBuffers(long length) throws EOFException;
+
+ public ByteBufferInputStream sliceStream(long length) throws EOFException {
+ return ByteBufferInputStream.wrap(sliceBuffers(length));
}
+ public abstract List<ByteBuffer> remainingBuffers();
- @Override
- public int available() {
- return byteBuf.remaining();
+ public ByteBufferInputStream remainingStream() {
+ return ByteBufferInputStream.wrap(remainingBuffers());
}
}