You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by to...@apache.org on 2014/07/16 15:51:57 UTC
git commit: PARQUET-9: Filtering records across multiple blocks
Repository: incubator-parquet-mr
Updated Branches:
refs/heads/master 9c2fab441 -> 2d8ebdbe0
PARQUET-9: Filtering records across multiple blocks
Update of the minimal fix discussed in https://github.com/apache/incubator-parquet-mr/pull/1, with the recursive call changed to to a loop.
Author: Tom White <to...@cloudera.com>
Author: Steven Willis <sw...@compete.com>
Closes #9 from tomwhite/filtering-records-across-multiple-blocks and squashes the following commits:
afb08a4 [Tom White] Minimal fix
9e723ee [Steven Willis] Test for filtering records across multiple blocks
Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/2d8ebdbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/2d8ebdbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/2d8ebdbe
Branch: refs/heads/master
Commit: 2d8ebdbe00786823658bcdd2817e6b5afee15b25
Parents: 9c2fab4
Author: Tom White <to...@cloudera.com>
Authored: Wed Jul 16 14:50:29 2014 +0100
Committer: Tom White <to...@cloudera.com>
Committed: Wed Jul 16 14:50:29 2014 +0100
----------------------------------------------------------------------
.../parquet/avro/TestSpecificReadWrite.java | 52 +++++++++++++++++++-
.../hadoop/InternalParquetRecordReader.java | 14 +++++-
2 files changed, 63 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/2d8ebdbe/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java b/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java
index 48855a0..03224c2 100644
--- a/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java
+++ b/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java
@@ -77,6 +77,52 @@ public class TestSpecificReadWrite {
}
@Test
+ public void testFilterMatchesMultipleBlocks() throws IOException {
+ Path path = writeCarsToParquetFile(10000, CompressionCodecName.UNCOMPRESSED, false, DEFAULT_BLOCK_SIZE/64, DEFAULT_PAGE_SIZE/64);
+ ParquetReader<Car> reader = new AvroParquetReader<Car>(path, column("make", equalTo("Volkswagen")));
+ for (int i = 0; i < 10000; i++) {
+ assertEquals(getVwPolo().toString(), reader.read().toString());
+ assertEquals(getVwPassat().toString(), reader.read().toString());
+ }
+ assertNull(reader.read());
+ }
+
+ @Test
+ public void testFilterMatchesNoBlocks() throws IOException {
+ Path path = writeCarsToParquetFile(10000, CompressionCodecName.UNCOMPRESSED, false, DEFAULT_BLOCK_SIZE/64, DEFAULT_PAGE_SIZE/64);
+ ParquetReader<Car> reader = new AvroParquetReader<Car>(path, column("make", equalTo("Bogus")));
+ assertNull(reader.read());
+ }
+
+ @Test
+ public void testFilterMatchesFinalBlockOnly() throws IOException {
+ File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+ tmp.deleteOnExit();
+ tmp.delete();
+ Path path = new Path(tmp.getPath());
+
+ Car vwPolo = getVwPolo();
+ Car vwPassat = getVwPassat();
+ Car bmwMini = getBmwMini();
+
+ ParquetWriter<Car> writer = new AvroParquetWriter<Car>(path, Car.SCHEMA$,
+ CompressionCodecName.UNCOMPRESSED, DEFAULT_BLOCK_SIZE/128, DEFAULT_PAGE_SIZE/128,
+ false);
+ for (int i = 0; i < 10000; i++) {
+ writer.write(vwPolo);
+ writer.write(vwPassat);
+ writer.write(vwPolo);
+ }
+ writer.write(bmwMini); // only write BMW in last block
+ writer.close();
+
+ ParquetReader<Car> reader = new AvroParquetReader<Car>(path, column("make",
+ equalTo("BMW")));
+ assertEquals(getBmwMini().toString(), reader.read().toString());
+ assertNull(reader.read());
+ }
+
+ @Test
public void testFilterWithDictionary() throws IOException {
Path path = writeCarsToParquetFile(1,CompressionCodecName.UNCOMPRESSED,true);
ParquetReader<Car> reader = new AvroParquetReader<Car>(path, column("make", equalTo("Volkswagen")));
@@ -159,6 +205,10 @@ public class TestSpecificReadWrite {
}
private Path writeCarsToParquetFile( int num, CompressionCodecName compression, boolean enableDictionary) throws IOException {
+ return writeCarsToParquetFile(num, compression, enableDictionary, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
+ }
+
+ private Path writeCarsToParquetFile( int num, CompressionCodecName compression, boolean enableDictionary, int blockSize, int pageSize) throws IOException {
File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
tmp.deleteOnExit();
tmp.delete();
@@ -169,7 +219,7 @@ public class TestSpecificReadWrite {
Car bmwMini = getBmwMini();
ParquetWriter<Car> writer = new AvroParquetWriter<Car>(path,Car.SCHEMA$, compression,
- DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, enableDictionary);
+ blockSize, pageSize, enableDictionary);
for (int i = 0; i < num; i++) {
writer.write(vwPolo);
writer.write(vwPassat);
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/2d8ebdbe/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
index 8d99a29..f3aa81f 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
@@ -53,7 +53,7 @@ class InternalParquetRecordReader<T> {
private T currentValue;
private long total;
- private int current = 0;
+ private long current = 0;
private int currentBlock = -1;
private ParquetFileReader reader;
private parquet.io.RecordReader<T> recordReader;
@@ -173,8 +173,18 @@ class InternalParquetRecordReader<T> {
try {
checkRead();
currentValue = recordReader.read();
- if (DEBUG) LOG.debug("read value: " + currentValue);
current ++;
+ while (currentValue == null) { // only happens with FilteredRecordReader at end of block
+ current = totalCountLoadedSoFar;
+ if (current < total) {
+ checkRead();
+ currentValue = recordReader.read();
+ current ++;
+ continue;
+ }
+ return false;
+ }
+ if (DEBUG) LOG.debug("read value: " + currentValue);
} catch (RuntimeException e) {
throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, file), e);
}