You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ga...@apache.org on 2021/08/11 04:17:51 UTC

[hudi] branch master updated: [HUDI-2292] MOR should not predicate pushdown when reading with payload_combine type (#3443)

This is an automated email from the ASF dual-hosted git repository.

garyli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a5e496f  [HUDI-2292] MOR should not predicate pushdown when reading with payload_combine type (#3443)
a5e496f is described below

commit a5e496fe23c3cd89ceff0e4c49d27df325ba5bd8
Author: Shawy Geng <ge...@gmail.com>
AuthorDate: Wed Aug 11 12:17:39 2021 +0800

    [HUDI-2292] MOR should not predicate pushdown when reading with payload_combine type (#3443)
---
 .../hudi/common/testutils/HoodieTestDataGenerator.java  | 17 ++++++++++++++++-
 .../org/apache/hudi/MergeOnReadSnapshotRelation.scala   |  2 +-
 .../org/apache/hudi/functional/TestMORDataSource.scala  | 14 +++++++++++++-
 3 files changed, 30 insertions(+), 3 deletions(-)

diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index e4ea186..68d1f2d 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -208,8 +208,13 @@ public class HoodieTestDataGenerator {
    */
   public static RawTripTestPayload generateRandomValue(
       HoodieKey key, String instantTime, boolean isFlattened) throws IOException {
+    return generateRandomValue(key, instantTime, isFlattened, 0);
+  }
+
+  public static RawTripTestPayload generateRandomValue(
+      HoodieKey key, String instantTime, boolean isFlattened, int ts) throws IOException {
     GenericRecord rec = generateGenericRecord(
-        key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime, "driver-" + instantTime, 0,
+        key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime, "driver-" + instantTime, ts,
         false, isFlattened);
     return new RawTripTestPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
   }
@@ -591,6 +596,16 @@ public class HoodieTestDataGenerator {
     return updates;
   }
 
+  public List<HoodieRecord> generateUpdatesWithTS(String instantTime, List<HoodieRecord> baseRecords, int ts) throws IOException {
+    List<HoodieRecord> updates = new ArrayList<>();
+    for (HoodieRecord baseRecord : baseRecords) {
+      HoodieRecord record = new HoodieRecord(baseRecord.getKey(),
+          generateRandomValue(baseRecord.getKey(), instantTime, false, ts));
+      updates.add(record);
+    }
+    return updates;
+  }
+
   public List<HoodieRecord> generateUpdatesWithDiffPartition(String instantTime, List<HoodieRecord> baseRecords)
       throws IOException {
     List<HoodieRecord> updates = new ArrayList<>();
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index cf8296c..1d14030 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -122,7 +122,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
       dataSchema = tableStructSchema,
       partitionSchema = StructType(Nil),
       requiredSchema = tableStructSchema,
-      filters = filters,
+      filters = Seq.empty,
       options = optParams,
       hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
     )
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 8855fb0..82f6cf9 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -412,13 +412,15 @@ class TestMORDataSource extends HoodieClientTestBase {
     // First Operation:
     // Producing parquet files to three default partitions.
     // SNAPSHOT view on MOR table with parquet files only.
-    val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
+    val hoodieRecords1 = dataGen.generateInserts("001", 100)
+    val records1 = recordsToStrings(hoodieRecords1).toList
     val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
     inputDF1.write.format("org.apache.hudi")
       .options(commonOpts)
       .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+      .option(DataSourceWriteOptions.PAYLOAD_CLASS.key, classOf[DefaultHoodieRecordPayload].getName)
       .mode(SaveMode.Overwrite)
       .save(basePath)
     val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
@@ -484,6 +486,16 @@ class TestMORDataSource extends HoodieClientTestBase {
     verifyShow(hudiIncDF1)
     verifyShow(hudiIncDF2)
     verifyShow(hudiIncDF1Skipmerge)
+
+    val record3 = recordsToStrings(dataGen.generateUpdatesWithTS("003", hoodieRecords1, -1))
+    spark.read.json(spark.sparkContext.parallelize(record3, 2))
+      .write.format("org.apache.hudi").options(commonOpts)
+      .mode(SaveMode.Append).save(basePath)
+    val hudiSnapshotDF3 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+      .load(basePath + "/*/*/*/*")
+    assertEquals(100, hudiSnapshotDF3.count())
+    assertEquals(0, hudiSnapshotDF3.filter("rider = 'rider-003'").count())
   }
 
   @Test