You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ga...@apache.org on 2021/08/11 04:17:51 UTC
[hudi] branch master updated: [HUDI-2292] MOR should not predicate
pushdown when reading with payload_combine type (#3443)
This is an automated email from the ASF dual-hosted git repository.
garyli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a5e496f [HUDI-2292] MOR should not predicate pushdown when reading with payload_combine type (#3443)
a5e496f is described below
commit a5e496fe23c3cd89ceff0e4c49d27df325ba5bd8
Author: Shawy Geng <ge...@gmail.com>
AuthorDate: Wed Aug 11 12:17:39 2021 +0800
[HUDI-2292] MOR should not predicate pushdown when reading with payload_combine type (#3443)
---
.../hudi/common/testutils/HoodieTestDataGenerator.java | 17 ++++++++++++++++-
.../org/apache/hudi/MergeOnReadSnapshotRelation.scala | 2 +-
.../org/apache/hudi/functional/TestMORDataSource.scala | 14 +++++++++++++-
3 files changed, 30 insertions(+), 3 deletions(-)
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index e4ea186..68d1f2d 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -208,8 +208,13 @@ public class HoodieTestDataGenerator {
*/
public static RawTripTestPayload generateRandomValue(
HoodieKey key, String instantTime, boolean isFlattened) throws IOException {
+ return generateRandomValue(key, instantTime, isFlattened, 0);
+ }
+
+ public static RawTripTestPayload generateRandomValue(
+ HoodieKey key, String instantTime, boolean isFlattened, int ts) throws IOException {
GenericRecord rec = generateGenericRecord(
- key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime, "driver-" + instantTime, 0,
+ key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime, "driver-" + instantTime, ts,
false, isFlattened);
return new RawTripTestPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
}
@@ -591,6 +596,16 @@ public class HoodieTestDataGenerator {
return updates;
}
+ public List<HoodieRecord> generateUpdatesWithTS(String instantTime, List<HoodieRecord> baseRecords, int ts) throws IOException {
+ List<HoodieRecord> updates = new ArrayList<>();
+ for (HoodieRecord baseRecord : baseRecords) {
+ HoodieRecord record = new HoodieRecord(baseRecord.getKey(),
+ generateRandomValue(baseRecord.getKey(), instantTime, false, ts));
+ updates.add(record);
+ }
+ return updates;
+ }
+
public List<HoodieRecord> generateUpdatesWithDiffPartition(String instantTime, List<HoodieRecord> baseRecords)
throws IOException {
List<HoodieRecord> updates = new ArrayList<>();
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index cf8296c..1d14030 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -122,7 +122,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
requiredSchema = tableStructSchema,
- filters = filters,
+ filters = Seq.empty,
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 8855fb0..82f6cf9 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -412,13 +412,15 @@ class TestMORDataSource extends HoodieClientTestBase {
// First Operation:
// Producing parquet files to three default partitions.
// SNAPSHOT view on MOR table with parquet files only.
- val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
+ val hoodieRecords1 = dataGen.generateInserts("001", 100)
+ val records1 = recordsToStrings(hoodieRecords1).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ .option(DataSourceWriteOptions.PAYLOAD_CLASS.key, classOf[DefaultHoodieRecordPayload].getName)
.mode(SaveMode.Overwrite)
.save(basePath)
val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
@@ -484,6 +486,16 @@ class TestMORDataSource extends HoodieClientTestBase {
verifyShow(hudiIncDF1)
verifyShow(hudiIncDF2)
verifyShow(hudiIncDF1Skipmerge)
+
+ val record3 = recordsToStrings(dataGen.generateUpdatesWithTS("003", hoodieRecords1, -1))
+ spark.read.json(spark.sparkContext.parallelize(record3, 2))
+ .write.format("org.apache.hudi").options(commonOpts)
+ .mode(SaveMode.Append).save(basePath)
+ val hudiSnapshotDF3 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+ .load(basePath + "/*/*/*/*")
+ assertEquals(100, hudiSnapshotDF3.count())
+ assertEquals(0, hudiSnapshotDF3.filter("rider = 'rider-003'").count())
}
@Test