You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2021/10/26 18:01:12 UTC

[beam] branch master updated: [BEAM-13104] ParquetIO: SplitReadFn must read the whole block

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 136eadc  [BEAM-13104] ParquetIO: SplitReadFn must read the whole block
     new 46c649a  Merge pull request #15789 from aromanenko-dev/BEAM-13104-ParquetIO-filter
136eadc is described below

commit 136eadc121e136e25aafc2b65f130526e7f20142
Author: Alexey Romanenko <ar...@gmail.com>
AuthorDate: Mon Oct 25 17:21:47 2021 +0200

    [BEAM-13104] ParquetIO: SplitReadFn must read the whole block
---
 .../src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java     | 6 +++---
 .../src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java | 3 ++-
 2 files changed, 5 insertions(+), 4 deletions(-)

diff --git a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
index 81f5978..c733f67 100644
--- a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
+++ b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
@@ -888,12 +888,12 @@ public class ParquetIO {
                   continue;
                 }
                 if (record == null) {
-                  // only happens with FilteredRecordReader at end of block
+                  // it happens when a record is filtered out in this block
                   LOG.debug(
-                      "filtered record reader reached end of block in block {} in file {}",
+                      "record is filtered out by reader in block {} in file {}",
                       currentBlock,
                       file.toString());
-                  break;
+                  continue;
                 }
                 if (recordReader.shouldSkipCurrentRecord()) {
                   // this record is being filtered via the filter2 package
diff --git a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
index d2609b4..261abd9 100644
--- a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
+++ b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
@@ -574,7 +574,8 @@ public class ParquetIOTest implements Serializable {
         readPipeline.apply(
             ParquetIO.read(SCHEMA)
                 .from(temporaryFolder.getRoot().getAbsolutePath() + "/*")
-                .withConfiguration(configuration));
+                .withConfiguration(configuration)
+                .withSplit());
     PAssert.that(readBack).containsInAnyOrder(expectedRecords);
     readPipeline.run().waitUntilFinish();
   }