You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/11/04 23:00:56 UTC

[GitHub] [hudi] satishkotha commented on a change in pull request #2196: [HUDI-1349]spark sql support overwrite use replace action

satishkotha commented on a change in pull request #2196:
URL: https://github.com/apache/hudi/pull/2196#discussion_r517675835



##########
File path: hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
##########
@@ -156,6 +162,76 @@ class TestCOWDataSource extends HoodieClientTestBase {
     assertEquals(100, timeTravelDF.count()) // 100 initial inserts must be pulled
   }
 
+  @Test def testOverWriteModeUseReplaceAction(): Unit = {
+    val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    val records2 = recordsToStrings(dataGen.generateInserts("002", 5)).toList
+    val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    inputDF2.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    val metaClient = new HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true)
+    val commits =  metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
+      .map(instant => (instant.asInstanceOf[HoodieInstant]).getAction)
+    assertEquals(2, commits.size)
+    assertEquals("commit", commits(0))
+    assertEquals("replacecommit", commits(1))
+  }
+
+  @Test def testOverWriteModeUseReplaceActionOnDisJointPartitions(): Unit = {
+    // step1: Write 5 records to hoodie table for partition1 DEFAULT_FIRST_PARTITION_PATH
+    val records1 = recordsToStrings(dataGen.generateInsertsForPartition("001", 5, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    // step2: Write 7 more records using SaveMode.Overwrite for partition2 DEFAULT_SECOND_PARTITION_PATH
+    val records2 = recordsToStrings(dataGen.generateInsertsForPartition("002", 7, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).toList
+    val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    inputDF2.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    inputDF2.registerTempTable("tmpTable")
+
+    // step3: Query the rows count from hoodie table  for partition1 DEFAULT_FIRST_PARTITION_PATH
+    val recordCountForParititon1 =  spark.sql(String.format("select count(*) from tmpTable where partition = '%s'", HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).collect()

Review comment:
       tmpTable only registered inputDF2, so you will not get data for partition1 even if we do SaveMode.Append in line 206? Don't you need to read back all data from table? Can you please fix test setup?

##########
File path: hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
##########
@@ -156,6 +162,76 @@ class TestCOWDataSource extends HoodieClientTestBase {
     assertEquals(100, timeTravelDF.count()) // 100 initial inserts must be pulled
   }
 
+  @Test def testOverWriteModeUseReplaceAction(): Unit = {

Review comment:
       I ran a similar test using quick start setup guide:
   1) ingest 10 records for partition "2020/03/11"
   `scala> val inserts = convertToStringList(dataGen.generateInserts(10))
   inserts: java.util.List[String] = [{"ts": 0, "uuid": "299d5202-1ea0-4918-9d2f-2365bc1c2402", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.4726905879569653, "begin_lon": 0.46157858450465483, "end_lat": 0.754803407008858, "end_lon": 0.9671159942018241, "fare": 34.158284716382845, "partitionpath": "2020/03/11"}, {"ts": 0, "uuid": "0fc23a14-c815-4b09-bff1-c6193a6de5b7", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.6100070562136587, "begin_lon": 0.8779402295427752, "end_lat": 0.3407870505929602, "end_lon": 0.5030798142293655, "fare": 43.4923811219014, "partitionpath": "2020/03/11"}, {"ts": 0, "uuid": "7136e8f8-ed82-4fc4-b60d-f7367f7be791", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.5731835407930634, "begin_lon": 0.4923479652912024, "end_lat":...
   scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1))
   warning: there was one deprecation warning; re-run with -deprecation for details
   df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields]
   
   scala> df.write.format("org.apache.hudi").
        |     options(getQuickstartWriteConfigs).
        |     option(PRECOMBINE_FIELD_OPT_KEY, "ts").
        |     option(RECORDKEY_FIELD_OPT_KEY, "uuid").
        |     option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
        |     option(TABLE_NAME, tableName).
        |     mode(Overwrite).
        |     save(basePath);`
   
   2. Query the data back and see that records are written correctly
   `scala> val tripsSnapshotDF = spark.
        |     read.
        |     format("org.apache.hudi").
        |     load(basePath + "/*/*/*/*")
   20/11/04 14:51:53 WARN DefaultSource: Loading Base File Only View.
   tripsSnapshotDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 13 more fields]
   
   scala> tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
   
   scala> 
   
   scala> spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
   +-------------------+--------------------+----------------------+---------+----------+------------------+
   |_hoodie_commit_time|  _hoodie_record_key|_hoodie_partition_path|    rider|    driver|              fare|
   +-------------------+--------------------+----------------------+---------+----------+------------------+
   |     20201104145141|299d5202-1ea0-491...|            2020/03/11|rider-213|driver-213|34.158284716382845|
   |     20201104145141|0fc23a14-c815-4b0...|            2020/03/11|rider-213|driver-213|  43.4923811219014|
   |     20201104145141|7136e8f8-ed82-4fc...|            2020/03/11|rider-213|driver-213| 64.27696295884016|
   |     20201104145141|5ffa488e-d75e-4ef...|            2020/03/11|rider-213|driver-213| 93.56018115236618|
   |     20201104145141|cf09166f-dc3f-45e...|            2020/03/11|rider-213|driver-213|17.851135255091155|
   |     20201104145141|6f522490-e29e-419...|            2020/03/11|rider-213|driver-213|19.179139106643607|
   |     20201104145141|db97e3ef-ad7a-4e8...|            2020/03/11|rider-213|driver-213| 33.92216483948643|
   |     20201104145141|a42d7c22-d0bf-4b9...|            2020/03/11|rider-213|driver-213| 66.62084366450246|
   |     20201104145141|94154d3e-c3da-436...|            2020/03/11|rider-213|driver-213| 41.06290929046368|
   |     20201104145141|618b3f38-bb71-402...|            2020/03/11|rider-213|driver-213| 27.79478688582596|
   +-------------------+--------------------+----------------------+---------+----------+------------------+
   `
   
   3. Use insert_overwrite and write to new partition (This is with master, not using your change. So I still have insert_overwrite operation with Append mode)
   `scala>  val dataGen = new DataGenerator(Array("2020/09/11"))
   dataGen: org.apache.hudi.QuickstartUtils.DataGenerator = org.apache.hudi.QuickstartUtils$DataGenerator@f00b18a
   
   scala> val inserts2 = convertToStringList(dataGen.generateInserts(1))
   scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts2, 1))
   scala> df.write.format("org.apache.hudi").
        |     options(getQuickstartWriteConfigs).
        |     option(PRECOMBINE_FIELD_OPT_KEY, "ts").
        |     option(RECORDKEY_FIELD_OPT_KEY, "uuid").
        |     option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
        |     option(TABLE_NAME, tableName).
        |     mode(Append).
        | option(OPERATION_OPT_KEY, "insert_overwrite").
        |      save(basePath);`
   4. Query the data back
   `scala> spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
   +-------------------+--------------------+----------------------+---------+----------+------------------+
   |_hoodie_commit_time|  _hoodie_record_key|_hoodie_partition_path|    rider|    driver|              fare|
   +-------------------+--------------------+----------------------+---------+----------+------------------+
   |     20201104145258|299d5202-1ea0-491...|            2020/03/11|rider-213|driver-213|34.158284716382845|
   |     20201104145258|0fc23a14-c815-4b0...|            2020/03/11|rider-213|driver-213|  43.4923811219014|
   |     20201104145258|7136e8f8-ed82-4fc...|            2020/03/11|rider-213|driver-213| 64.27696295884016|
   |     20201104145258|5ffa488e-d75e-4ef...|            2020/03/11|rider-213|driver-213| 93.56018115236618|
   |     20201104145258|cf09166f-dc3f-45e...|            2020/03/11|rider-213|driver-213|17.851135255091155|
   |     20201104145258|6f522490-e29e-419...|            2020/03/11|rider-213|driver-213|19.179139106643607|
   |     20201104145258|db97e3ef-ad7a-4e8...|            2020/03/11|rider-213|driver-213| 33.92216483948643|
   |     20201104145258|a42d7c22-d0bf-4b9...|            2020/03/11|rider-213|driver-213| 66.62084366450246|
   |     20201104145258|94154d3e-c3da-436...|            2020/03/11|rider-213|driver-213| 41.06290929046368|
   |     20201104145258|618b3f38-bb71-402...|            2020/03/11|rider-213|driver-213| 27.79478688582596|
   |     20201104145348|4dac8aa3-b8fa-410...|            2020/09/11|rider-284|driver-284|49.527694252432056|
   +-------------------+--------------------+----------------------+---------+----------+------------------+
   `
   
   As you can see in step4, we see all 11 records. With 'SaveMode.Overwrite' we should only see 1 record. Hope this is clear.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org