You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2021/10/26 20:43:34 UTC
[beam] branch release-2.34.0 updated: [BEAM-13104] ParquetIO:
SplitReadFn must read the whole block
This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch release-2.34.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.34.0 by this push:
new 2d655ea [BEAM-13104] ParquetIO: SplitReadFn must read the whole block
new 621a4f9 Merge pull request #15806 from ibzib/parquet-cp
2d655ea is described below
commit 2d655eac45d036dc23962d09e45642b77c6f5cea
Author: Alexey Romanenko <ar...@gmail.com>
AuthorDate: Mon Oct 25 17:21:47 2021 +0200
[BEAM-13104] ParquetIO: SplitReadFn must read the whole block
---
.../src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java | 6 +++---
.../src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java | 3 ++-
2 files changed, 5 insertions(+), 4 deletions(-)
diff --git a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
index 81f5978..c733f67 100644
--- a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
+++ b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
@@ -888,12 +888,12 @@ public class ParquetIO {
continue;
}
if (record == null) {
- // only happens with FilteredRecordReader at end of block
+ // it happens when a record is filtered out in this block
LOG.debug(
- "filtered record reader reached end of block in block {} in file {}",
+ "record is filtered out by reader in block {} in file {}",
currentBlock,
file.toString());
- break;
+ continue;
}
if (recordReader.shouldSkipCurrentRecord()) {
// this record is being filtered via the filter2 package
diff --git a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
index d2609b4..261abd9 100644
--- a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
+++ b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
@@ -574,7 +574,8 @@ public class ParquetIOTest implements Serializable {
readPipeline.apply(
ParquetIO.read(SCHEMA)
.from(temporaryFolder.getRoot().getAbsolutePath() + "/*")
- .withConfiguration(configuration));
+ .withConfiguration(configuration)
+ .withSplit());
PAssert.that(readBack).containsInAnyOrder(expectedRecords);
readPipeline.run().waitUntilFinish();
}