You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by to...@apache.org on 2014/07/16 15:51:57 UTC

git commit: PARQUET-9: Filtering records across multiple blocks

Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master 9c2fab441 -> 2d8ebdbe0


PARQUET-9: Filtering records across multiple blocks

Update of the minimal fix discussed in https://github.com/apache/incubator-parquet-mr/pull/1, with the recursive call changed to to a loop.

Author: Tom White <to...@cloudera.com>
Author: Steven Willis <sw...@compete.com>

Closes #9 from tomwhite/filtering-records-across-multiple-blocks and squashes the following commits:

afb08a4 [Tom White] Minimal fix
9e723ee [Steven Willis] Test for filtering records across multiple blocks


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/2d8ebdbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/2d8ebdbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/2d8ebdbe

Branch: refs/heads/master
Commit: 2d8ebdbe00786823658bcdd2817e6b5afee15b25
Parents: 9c2fab4
Author: Tom White <to...@cloudera.com>
Authored: Wed Jul 16 14:50:29 2014 +0100
Committer: Tom White <to...@cloudera.com>
Committed: Wed Jul 16 14:50:29 2014 +0100

----------------------------------------------------------------------
 .../parquet/avro/TestSpecificReadWrite.java     | 52 +++++++++++++++++++-
 .../hadoop/InternalParquetRecordReader.java     | 14 +++++-
 2 files changed, 63 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/2d8ebdbe/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java b/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java
index 48855a0..03224c2 100644
--- a/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java
+++ b/parquet-avro/src/test/java/parquet/avro/TestSpecificReadWrite.java
@@ -77,6 +77,52 @@ public class TestSpecificReadWrite {
   }
 
   @Test
+  public void testFilterMatchesMultipleBlocks() throws IOException {
+    Path path = writeCarsToParquetFile(10000, CompressionCodecName.UNCOMPRESSED, false, DEFAULT_BLOCK_SIZE/64, DEFAULT_PAGE_SIZE/64);
+    ParquetReader<Car> reader = new AvroParquetReader<Car>(path, column("make", equalTo("Volkswagen")));
+    for (int i = 0; i < 10000; i++) {
+      assertEquals(getVwPolo().toString(), reader.read().toString());
+      assertEquals(getVwPassat().toString(), reader.read().toString());
+    }
+    assertNull(reader.read());
+  }
+
+  @Test
+  public void testFilterMatchesNoBlocks() throws IOException {
+    Path path = writeCarsToParquetFile(10000, CompressionCodecName.UNCOMPRESSED, false, DEFAULT_BLOCK_SIZE/64, DEFAULT_PAGE_SIZE/64);
+    ParquetReader<Car> reader = new AvroParquetReader<Car>(path, column("make", equalTo("Bogus")));
+    assertNull(reader.read());
+  }
+
+  @Test
+  public void testFilterMatchesFinalBlockOnly() throws IOException {
+    File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
+    tmp.deleteOnExit();
+    tmp.delete();
+    Path path = new Path(tmp.getPath());
+
+    Car vwPolo   = getVwPolo();
+    Car vwPassat = getVwPassat();
+    Car bmwMini  = getBmwMini();
+
+    ParquetWriter<Car> writer = new AvroParquetWriter<Car>(path, Car.SCHEMA$,
+        CompressionCodecName.UNCOMPRESSED, DEFAULT_BLOCK_SIZE/128, DEFAULT_PAGE_SIZE/128,
+        false);
+    for (int i = 0; i < 10000; i++) {
+      writer.write(vwPolo);
+      writer.write(vwPassat);
+      writer.write(vwPolo);
+    }
+    writer.write(bmwMini); // only write BMW in last block
+    writer.close();
+
+    ParquetReader<Car> reader = new AvroParquetReader<Car>(path, column("make",
+        equalTo("BMW")));
+    assertEquals(getBmwMini().toString(), reader.read().toString());
+    assertNull(reader.read());
+  }
+
+  @Test
   public void testFilterWithDictionary() throws IOException {
     Path path = writeCarsToParquetFile(1,CompressionCodecName.UNCOMPRESSED,true);
     ParquetReader<Car> reader = new AvroParquetReader<Car>(path, column("make", equalTo("Volkswagen")));
@@ -159,6 +205,10 @@ public class TestSpecificReadWrite {
   }
 
   private Path writeCarsToParquetFile( int num, CompressionCodecName compression, boolean enableDictionary) throws IOException {
+    return writeCarsToParquetFile(num, compression, enableDictionary, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
+  }
+
+  private Path writeCarsToParquetFile( int num, CompressionCodecName compression, boolean enableDictionary, int blockSize, int pageSize) throws IOException {
     File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp");
     tmp.deleteOnExit();
     tmp.delete();
@@ -169,7 +219,7 @@ public class TestSpecificReadWrite {
     Car bmwMini  = getBmwMini();
 
     ParquetWriter<Car> writer = new AvroParquetWriter<Car>(path,Car.SCHEMA$, compression,
-        DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, enableDictionary);
+        blockSize, pageSize, enableDictionary);
     for (int i = 0; i < num; i++) {
       writer.write(vwPolo);
       writer.write(vwPassat);

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/2d8ebdbe/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
index 8d99a29..f3aa81f 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
@@ -53,7 +53,7 @@ class InternalParquetRecordReader<T> {
 
   private T currentValue;
   private long total;
-  private int current = 0;
+  private long current = 0;
   private int currentBlock = -1;
   private ParquetFileReader reader;
   private parquet.io.RecordReader<T> recordReader;
@@ -173,8 +173,18 @@ class InternalParquetRecordReader<T> {
       try {
         checkRead();
         currentValue = recordReader.read();
-        if (DEBUG) LOG.debug("read value: " + currentValue);
         current ++;
+        while (currentValue == null) { // only happens with FilteredRecordReader at end of block
+          current = totalCountLoadedSoFar;
+          if (current < total) {
+            checkRead();
+            currentValue = recordReader.read();
+            current ++;
+            continue;
+          }
+          return false;
+        }
+        if (DEBUG) LOG.debug("read value: " + currentValue);
       } catch (RuntimeException e) {
         throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, file), e);
       }