You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/06/14 07:10:55 UTC

[GitHub] [hudi] leesf commented on a change in pull request #3055: [HUDI-1991] Fixing drop dups exception in bulk insert row writer path

leesf commented on a change in pull request #3055:
URL: https://github.com/apache/hudi/pull/3055#discussion_r649626906



##########
File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
##########
@@ -181,8 +181,45 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
     val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
     try {
 
-      val sqlContext = session.sqlContext
-      val sc = session.sparkContext
+      val hoodieFooTableName = "hoodie_foo_tbl"
+
+      //create a new table
+      val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
+        HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
+        DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
+        "hoodie.bulkinsert.shuffle.parallelism" -> "4",
+        DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
+        DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY -> "true",
+        INSERT_DROP_DUPS_OPT_KEY -> "true",
+        DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
+        DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
+        DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator")
+      val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
+
+      // generate the inserts
+      val schema = DataSourceTestUtils.getStructTypeExampleSchema
+      val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+      val records = DataSourceTestUtils.generateRandomRows(100)
+      val recordsSeq = convertRowListToSeq(records)
+      val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
+      // write to Hudi
+      HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df)
+      fail("Drop duplicates with bulk insert in row writing should have thrown exception")
+    } catch {
+      case e: HoodieException => println("Dropping duplicates with bulk_insert in row writer path is not supported yet")

Review comment:
       can we use `assert` instead of print message? 

##########
File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
##########
@@ -181,8 +181,45 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
     val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
     try {
 
-      val sqlContext = session.sqlContext
-      val sc = session.sparkContext
+      val hoodieFooTableName = "hoodie_foo_tbl"
+
+      //create a new table
+      val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
+        HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
+        DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
+        "hoodie.bulkinsert.shuffle.parallelism" -> "4",
+        DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
+        DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY -> "true",
+        INSERT_DROP_DUPS_OPT_KEY -> "true",
+        DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
+        DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
+        DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator")
+      val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
+
+      // generate the inserts
+      val schema = DataSourceTestUtils.getStructTypeExampleSchema
+      val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+      val records = DataSourceTestUtils.generateRandomRows(100)
+      val recordsSeq = convertRowListToSeq(records)
+      val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
+      // write to Hudi
+      HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df)
+      fail("Drop duplicates with bulk insert in row writing should have thrown exception")
+    } catch {
+      case e: HoodieException => println("Dropping duplicates with bulk_insert in row writer path is not supported yet")

Review comment:
       can we use `assert` instead of printing message? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org