You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "xushiyan (via GitHub)" <gi...@apache.org> on 2023/03/01 00:06:27 UTC

[GitHub] [hudi] xushiyan commented on a diff in pull request #7951: [HUDI-5796] Adding auto inferring partition from incoming df

xushiyan commented on code in PR #7951:
URL: https://github.com/apache/hudi/pull/7951#discussion_r1120954001


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -288,44 +287,42 @@ object DataSourceWriteOptions {
     .withDocumentation("The table type for the underlying data, for this write. This can’t change between writes.")
 
   /**
-    * Translate spark parameters to hudi parameters
-    *
-    * @param optParams Parameters to be translated
-    * @return Parameters after translation
-    */
-  def translateSqlOptions(optParams: Map[String, String]): Map[String, String] = {
-    var translatedOptParams = optParams
-    // translate the api partitionBy of spark DataFrameWriter to PARTITIONPATH_FIELD
-    // we should set hoodie's partition path only if its not set by the user.
-    if (optParams.contains(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)
-      && !optParams.contains(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) {
-      val partitionColumns = optParams.get(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)
-        .map(SparkDataSourceUtils.decodePartitioningColumns)
-        .getOrElse(Nil)
-      val keyGeneratorClass = optParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key(),
-        DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.defaultValue)
-
-      keyGeneratorClass match {
-        // CustomKeyGenerator needs special treatment, because it needs to be specified in a way
-        // such as "field1:PartitionKeyType1,field2:PartitionKeyType2".
-        // partitionBy can specify the partition like this: partitionBy("p1", "p2:SIMPLE", "p3:TIMESTAMP")
-        case c if (c.nonEmpty && c == classOf[CustomKeyGenerator].getName) =>
-          val partitionPathField = partitionColumns.map(e => {
-            if (e.contains(":")) {
-              e
-            } else {
-              s"$e:SIMPLE"
-            }
-          }).mkString(",")
-          translatedOptParams = optParams ++ Map(PARTITIONPATH_FIELD.key -> partitionPathField)
-        case c if (c.isEmpty || !keyGeneratorClass.equals(classOf[NonpartitionedKeyGenerator].getName)) =>
-          // for any key gen other than NonPartitioned key gen, we can override the partition field config.
-          val partitionPathField = partitionColumns.mkString(",")
-          translatedOptParams = optParams ++ Map(PARTITIONPATH_FIELD.key -> partitionPathField)
-        case _ => // no op incase of NonPartitioned Key gen.
-      }
+   * Derive [[PARTITIONPATH_FIELD]] based on [[SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY]]
+   * if [[PARTITIONPATH_FIELD]] is not set explicitly.
+   */
+  def derivePartitionPathFieldsIfNeeded(optParams: Map[String, String]): Map[String, String] = {
+    if (!optParams.contains(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)
+      || optParams.contains(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) {
+      return optParams
+    }
+
+    val partitionColumns = optParams.get(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)
+      .map(SparkDataSourceUtils.decodePartitioningColumns)
+      .getOrElse(Nil)
+    val keyGeneratorClass = optParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key(),
+      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.defaultValue)
+
+    keyGeneratorClass match {
+      // CustomKeyGenerator needs special treatment, because it needs to be specified in a way
+      // such as "field1:PartitionKeyType1,field2:PartitionKeyType2".
+      // partitionBy can specify the partition like this: partitionBy("p1", "p2:SIMPLE", "p3:TIMESTAMP")
+      case c if Array(classOf[CustomKeyGenerator].getName, classOf[CustomAvroKeyGenerator].getName).contains(c) =>
+        val partitionPathField = partitionColumns.map(e => {
+          if (e.contains(":")) {
+            e
+          } else {
+            s"$e:SIMPLE"
+          }
+        }).mkString(",")
+        optParams ++ Map(PARTITIONPATH_FIELD.key -> partitionPathField)
+      case c if !Array(classOf[NonpartitionedKeyGenerator].getName, classOf[NonpartitionedAvroKeyGenerator].getName).contains(c) =>

Review Comment:
   not the ideal way of checking key gen type; key gen class itself should tell us if it's of type non-partitioned or not, same for custom keygen classes. The key gen class factory can provide a util to check. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org