You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2021/10/26 20:43:34 UTC

[beam] branch release-2.34.0 updated: [BEAM-13104] ParquetIO: SplitReadFn must read the whole block

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch release-2.34.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.34.0 by this push:
     new 2d655ea  [BEAM-13104] ParquetIO: SplitReadFn must read the whole block
     new 621a4f9  Merge pull request #15806 from ibzib/parquet-cp
2d655ea is described below

commit 2d655eac45d036dc23962d09e45642b77c6f5cea
Author: Alexey Romanenko <ar...@gmail.com>
AuthorDate: Mon Oct 25 17:21:47 2021 +0200

    [BEAM-13104] ParquetIO: SplitReadFn must read the whole block
---
 .../src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java     | 6 +++---
 .../src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java | 3 ++-
 2 files changed, 5 insertions(+), 4 deletions(-)

diff --git a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
index 81f5978..c733f67 100644
--- a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
+++ b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
@@ -888,12 +888,12 @@ public class ParquetIO {
                   continue;
                 }
                 if (record == null) {
-                  // only happens with FilteredRecordReader at end of block
+                  // it happens when a record is filtered out in this block
                   LOG.debug(
-                      "filtered record reader reached end of block in block {} in file {}",
+                      "record is filtered out by reader in block {} in file {}",
                       currentBlock,
                       file.toString());
-                  break;
+                  continue;
                 }
                 if (recordReader.shouldSkipCurrentRecord()) {
                   // this record is being filtered via the filter2 package
diff --git a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
index d2609b4..261abd9 100644
--- a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
+++ b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
@@ -574,7 +574,8 @@ public class ParquetIOTest implements Serializable {
         readPipeline.apply(
             ParquetIO.read(SCHEMA)
                 .from(temporaryFolder.getRoot().getAbsolutePath() + "/*")
-                .withConfiguration(configuration));
+                .withConfiguration(configuration)
+                .withSplit());
     PAssert.that(readBack).containsInAnyOrder(expectedRecords);
     readPipeline.run().waitUntilFinish();
   }