You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/09/29 05:03:14 UTC

[GitHub] [hudi] xushiyan commented on a diff in pull request #6821: [HUDI-4237] Fixing empty partition-values being sync'd to HMS

xushiyan commented on code in PR #6821:
URL: https://github.com/apache/hudi/pull/6821#discussion_r983066569


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala:
##########
@@ -45,37 +49,91 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
       None
     }
   }
-  // The origin key generator class for this table.
-  private lazy val originKeyGen = {
-    val beforeKeyGenClassName = props.getString(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME, null)
-    if (beforeKeyGenClassName != null && beforeKeyGenClassName.nonEmpty) {
-      val keyGenProps = new TypedProperties()
-      keyGenProps.putAll(props)
-      keyGenProps.remove(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME)
-      val convertedKeyGenClassName = SqlKeyGenerator.getRealKeyGenClassName(props)
-      keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, convertedKeyGenClassName)
-      Some(KeyGenUtils.createKeyGeneratorByClassName(keyGenProps))
-    } else {
-      None
+
+  private lazy val complexKeyGen = new ComplexKeyGenerator(props)
+  private lazy val originalKeyGen =
+    Option(props.getString(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME, null))
+      .map { originalKeyGenClassName =>
+        checkArgument(originalKeyGenClassName.nonEmpty)
+
+        val convertedKeyGenClassName = HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(originalKeyGenClassName)
+
+        val keyGenProps = new TypedProperties()
+        keyGenProps.putAll(props)
+        keyGenProps.remove(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME)
+        keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, convertedKeyGenClassName)
+
+        KeyGenUtils.createKeyGeneratorByClassName(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface]
+      }
+
+  override def getRecordKey(record: GenericRecord): String =
+    originalKeyGen.map {
+      _.getKey(record).getRecordKey
+    } getOrElse {
+      complexKeyGen.getRecordKey(record)
     }
+
+  override def getRecordKey(row: Row): String =
+    originalKeyGen.map {
+      _.getRecordKey(row)
+    } getOrElse {
+      complexKeyGen.getRecordKey(row)
+    }
+
+
+  override def getRecordKey(internalRow: InternalRow, schema: StructType): UTF8String =
+    originalKeyGen.map {
+      _.getRecordKey(internalRow, schema)
+    } getOrElse {
+      complexKeyGen.getRecordKey(internalRow, schema)
+    }
+
+  override def getPartitionPath(record: GenericRecord): String = {
+    val partitionPath = originalKeyGen.map {
+      _.getKey(record).getPartitionPath
+    } getOrElse {
+      complexKeyGen.getPartitionPath(record)
+    }
+
+    convertPartitionPathToSqlType(partitionPath, rowType = false)
   }
 
-  override def getRecordKey(record: GenericRecord): String = {
-    if (originKeyGen.isDefined) {
-      originKeyGen.get.getKey(record).getRecordKey
-    } else {
-      super.getRecordKey(record)
+  override def getPartitionPath(row: Row): String = {
+    val partitionPath = originalKeyGen.map {
+      _.getPartitionPath(row)
+    } getOrElse {
+      complexKeyGen.getPartitionPath(row)
     }
+
+    convertPartitionPathToSqlType(partitionPath, rowType = true)
   }
 
-  override def getRecordKey(row: Row): String = {
-    if (originKeyGen.isDefined) {
-      originKeyGen.get.asInstanceOf[SparkKeyGeneratorInterface].getRecordKey(row)
-    } else {
-      super.getRecordKey(row)
+  override def getPartitionPath(internalRow: InternalRow, schema: StructType): UTF8String = {
+    val partitionPath = originalKeyGen.map {
+      _.getPartitionPath(internalRow, schema)
+    } getOrElse {
+      complexKeyGen.getPartitionPath(internalRow, schema)
+    }
+
+    UTF8String.fromString(convertPartitionPathToSqlType(partitionPath.toString, rowType = true))
+  }
+
+  override def getRecordKeyFieldNames: util.List[String] = {
+    originalKeyGen.map(_.getRecordKeyFieldNames)
+      .getOrElse(complexKeyGen.getRecordKeyFieldNames)
+  }
+
+  override def getPartitionPathFields: util.List[String] = {
+    originalKeyGen.map {
+      case bkg: BaseKeyGenerator => bkg.getPartitionPathFields
+      case _ =>
+        Option(super.getPartitionPathFields).getOrElse(Collections.emptyList[String])

Review Comment:
   dont quite understand this part. `super.getPartitionPathFields` also goes to BaseKeyGen. can you help clarify pls



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org