You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "xushiyan (via GitHub)" <gi...@apache.org> on 2023/02/21 22:00:26 UTC

[GitHub] [hudi] xushiyan commented on a diff in pull request #7951: [HUDI-5796] Adding auto inferring partition from incoming df

xushiyan commented on code in PR #7951:
URL: https://github.com/apache/hudi/pull/7951#discussion_r1113598296


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1018,6 +1023,26 @@ object HoodieSparkSqlWriter {
     }
   }
 
+  private def mayBeInferPartition(rawParams: Map[String, String]): Map[String, String] = {

Review Comment:
   /nit `maybeInferPartition()`



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -95,14 +96,16 @@ object HoodieSparkSqlWriter {
     ConfigProperty.key("hoodie.internal.sql.merge.into.writes")
       .defaultValue(false)
 
+  val SPARK_DF_PARTITION_COLUMN_NAME = "__partition_columns"

Review Comment:
   can you added doc to explain where this special constant comes from?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -111,15 +114,17 @@ object HoodieSparkSqlWriter {
             extraPreCommitFn: Option[BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata]] = Option.empty):
   (Boolean, HOption[String], HOption[String], HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = {
 
-    assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set")
-    val path = optParams("path")
+    assert(rawParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set")
+    val path = rawParams("path")
     val basePath = new Path(path)
 
     val spark = sqlContext.sparkSession
     val sparkContext = sqlContext.sparkContext
 
     val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
     tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
+    var optParams = mayBeInferPartition(rawParams)

Review Comment:
   minor: `optParams` sounds more intuitive indicating it comes from option(). you can call it `finalOptParams` after making inferences.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1018,6 +1023,26 @@ object HoodieSparkSqlWriter {
     }
   }
 
+  private def mayBeInferPartition(rawParams: Map[String, String]): Map[String, String] = {
+    var optParams = rawParams
+    // if hoodie's partition path field is not set and incoming df's partition is set, infer from it.
+    if (!rawParams.containsKey(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()) && optParams.containsKey(SPARK_DF_PARTITION_COLUMN_NAME)){
+      val partitionCols : String =  optParams.get(SPARK_DF_PARTITION_COLUMN_NAME).get
+      val partitionFieldValue : String = if (partitionCols.startsWith("[")) {
+        val parts : Array[String] = partitionCols.substring(1, partitionCols.length-1).split(",")
+        var partitionFieldStr = ""
+        parts.foreach(part => {
+          partitionFieldStr += part.substring(1, part.length-1) + ","
+        })
+        partitionFieldStr.substring(0, partitionFieldStr.length - 1)
+      } else {
+        partitionCols
+      }

Review Comment:
   should use pattern matching to handle all cases inclduing invalid ones



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -142,6 +145,61 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport
     spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
   }
 
+  @Test
+  def testInferPartitionBy(): Unit = {
+    val (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO, Map())
+      // Insert Operation
+      val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+      val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+      val commonOptsNoPreCombine = Map(
+        "hoodie.insert.shuffle.parallelism" -> "4",
+        "hoodie.upsert.shuffle.parallelism" -> "4",
+        DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+        HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
+      ) ++ writeOpts
+
+      inputDF.write.partitionBy("partition").format("hudi")
+        .options(commonOptsNoPreCombine)
+        .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+        .mode(SaveMode.Overwrite)
+        .save(basePath)
+
+    val snapshot0 = spark.read.format("org.apache.hudi").options(readOpts).load(basePath)
+    snapshot0.cache()
+    assertEquals(100, snapshot0.count())
+
+    // verify partition cols
+    assertTrue(snapshot0.filter("_hoodie_partition_path = '" + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "'").count() > 0)
+    assertTrue(snapshot0.filter("_hoodie_partition_path = '" + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH + "'").count() > 0)
+    assertTrue(snapshot0.filter("_hoodie_partition_path = '" + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH + "'").count() > 0)

Review Comment:
   assert the physical partition path too?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -142,6 +145,61 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport
     spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
   }
 
+  @Test
+  def testInferPartitionBy(): Unit = {
+    val (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO, Map())
+      // Insert Operation
+      val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+      val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+      val commonOptsNoPreCombine = Map(
+        "hoodie.insert.shuffle.parallelism" -> "4",
+        "hoodie.upsert.shuffle.parallelism" -> "4",
+        DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+        HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
+      ) ++ writeOpts
+
+      inputDF.write.partitionBy("partition").format("hudi")
+        .options(commonOptsNoPreCombine)
+        .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+        .mode(SaveMode.Overwrite)
+        .save(basePath)
+
+    val snapshot0 = spark.read.format("org.apache.hudi").options(readOpts).load(basePath)
+    snapshot0.cache()
+    assertEquals(100, snapshot0.count())
+
+    // verify partition cols
+    assertTrue(snapshot0.filter("_hoodie_partition_path = '" + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "'").count() > 0)
+    assertTrue(snapshot0.filter("_hoodie_partition_path = '" + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH + "'").count() > 0)
+    assertTrue(snapshot0.filter("_hoodie_partition_path = '" + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH + "'").count() > 0)
+
+    // try w/ multi field partition paths
+    // generate two batches of df w/ diff partition path values.
+    val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    val records2 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    // hard code the value for rider and fare so that we can verify the partitions paths with hudi
+    val toInsertDf = inputDF1.withColumn("fare",lit(100)).withColumn("rider",lit("rider-123"))
+      .union(inputDF2.withColumn("fare",lit(200)).withColumn("rider",lit("rider-456")))
+
+    toInsertDf.write.partitionBy("fare","rider").format("hudi")
+      .options(commonOptsNoPreCombine)
+      .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    val snapshot1 = spark.read.format("org.apache.hudi").options(readOpts).load(basePath)
+    snapshot1.cache()
+    assertEquals(200, snapshot1.count())
+
+    val partitionPaths = FSUtils.getAllPartitionPaths(new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)), HoodieMetadataConfig.newBuilder().build(), basePath)
+    partitionPaths.foreach(entry => println("partition path :: " + entry))
+    assertTrue(partitionPaths.contains("100/rider-123"))
+    assertTrue(partitionPaths.contains("200/rider-456"))

Review Comment:
   assert the partition col value too?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -142,6 +145,61 @@ class TestCOWDataSource extends HoodieClientTestBase with ScalaAssertionSupport
     spark.read.format("org.apache.hudi").options(readOpts).load(basePath).count()
   }
 
+  @Test
+  def testInferPartitionBy(): Unit = {
+    val (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO, Map())
+      // Insert Operation
+      val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+      val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+      val commonOptsNoPreCombine = Map(
+        "hoodie.insert.shuffle.parallelism" -> "4",
+        "hoodie.upsert.shuffle.parallelism" -> "4",
+        DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+        HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
+      ) ++ writeOpts
+
+      inputDF.write.partitionBy("partition").format("hudi")
+        .options(commonOptsNoPreCombine)
+        .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+        .mode(SaveMode.Overwrite)
+        .save(basePath)
+
+    val snapshot0 = spark.read.format("org.apache.hudi").options(readOpts).load(basePath)
+    snapshot0.cache()
+    assertEquals(100, snapshot0.count())
+
+    // verify partition cols
+    assertTrue(snapshot0.filter("_hoodie_partition_path = '" + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "'").count() > 0)
+    assertTrue(snapshot0.filter("_hoodie_partition_path = '" + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH + "'").count() > 0)
+    assertTrue(snapshot0.filter("_hoodie_partition_path = '" + HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH + "'").count() > 0)
+
+    // try w/ multi field partition paths
+    // generate two batches of df w/ diff partition path values.
+    val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    val records2 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    // hard code the value for rider and fare so that we can verify the partitions paths with hudi
+    val toInsertDf = inputDF1.withColumn("fare",lit(100)).withColumn("rider",lit("rider-123"))
+      .union(inputDF2.withColumn("fare",lit(200)).withColumn("rider",lit("rider-456")))
+
+    toInsertDf.write.partitionBy("fare","rider").format("hudi")
+      .options(commonOptsNoPreCombine)
+      .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    val snapshot1 = spark.read.format("org.apache.hudi").options(readOpts).load(basePath)
+    snapshot1.cache()
+    assertEquals(200, snapshot1.count())
+
+    val partitionPaths = FSUtils.getAllPartitionPaths(new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)), HoodieMetadataConfig.newBuilder().build(), basePath)

Review Comment:
   the parent test harness class already provides jsc, engine context, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org