You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "alexeykudinkin (via GitHub)" <gi...@apache.org> on 2023/02/02 04:48:18 UTC

[GitHub] [hudi] alexeykudinkin opened a new pull request, #7825: [DNM] Fixing deduplication in Bulk Insert row-writing path

alexeykudinkin opened a new pull request, #7825:
URL: https://github.com/apache/hudi/pull/7825

   ### Change Logs
   
   TBA
   
   ### Impact
   
   TBA
   
   ### Risk level (write none, low medium or high below)
   
   Low
   
   ### Documentation Update
   
   N/A
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Change Logs and Impact were stated clearly
   - [ ] Adequate tests were added if applicable
   - [ ] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #7825: [HUDI-5685] Fixing deduplication in Bulk Insert row-writing path

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan commented on code in PR #7825:
URL: https://github.com/apache/hudi/pull/7825#discussion_r1094187967


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -171,36 +172,30 @@ object HoodieDatasetBulkInsertHelper
     table.getContext.parallelize(writeStatuses.toList.asJava)
   }
 
-  private def dedupeRows(rdd: RDD[InternalRow], schema: StructType, preCombineFieldRef: String, isGlobalIndex: Boolean): RDD[InternalRow] = {
+  private def dedupRows(rdd: RDD[InternalRow], schema: StructType, preCombineFieldRef: String, isPartitioned: Boolean): RDD[InternalRow] = {
     val recordKeyMetaFieldOrd = schema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)
     val partitionPathMetaFieldOrd = schema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
     // NOTE: Pre-combine field could be a nested field
     val preCombineFieldPath = composeNestedFieldPath(schema, preCombineFieldRef)
       .getOrElse(throw new HoodieException(s"Pre-combine field $preCombineFieldRef is missing in $schema"))
 
     rdd.map { row =>
-        val rowKey = if (isGlobalIndex) {
-          row.getString(recordKeyMetaFieldOrd)
+      val partitionPath = if (isPartitioned) row.getUTF8String(partitionPathMetaFieldOrd) else UTF8String.EMPTY_UTF8
+      val recordKey = row.getUTF8String(recordKeyMetaFieldOrd)
+
+      ((partitionPath, recordKey), row)

Review Comment:
   not copying the `row` here?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieDataTypeUtils.scala:
##########
@@ -18,10 +18,29 @@
 
 package org.apache.spark.sql
 
+import org.apache.hudi.common.model.HoodieRecord
 import org.apache.spark.sql.types._
 
+import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
+
 object HoodieDataTypeUtils {
 
+  /**
+   * Checks whether provided schema contains Hudi's meta-fields
+   *
+   * NOTE: This method validates presence of just one field [[HoodieRecord.RECORD_KEY_METADATA_FIELD]],
+   * however assuming that meta-fields should either be omitted or specified in full
+   */
+  def hasMetaFields(structType: StructType): Boolean =
+    structType.getFieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD).isDefined
+
+  // TODO scala-doc

Review Comment:
   resolve TODO



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieDataTypeUtils.scala:
##########
@@ -18,10 +18,29 @@
 
 package org.apache.spark.sql
 
+import org.apache.hudi.common.model.HoodieRecord
 import org.apache.spark.sql.types._
 
+import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
+
 object HoodieDataTypeUtils {
 
+  /**
+   * Checks whether provided schema contains Hudi's meta-fields
+   *
+   * NOTE: This method validates presence of just one field [[HoodieRecord.RECORD_KEY_METADATA_FIELD]],
+   * however assuming that meta-fields should either be omitted or specified in full
+   */
+  def hasMetaFields(structType: StructType): Boolean =
+    structType.getFieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD).isDefined
+
+  // TODO scala-doc
+  def addMetaFields(schema: StructType): StructType = {

Review Comment:
   this is more like ensuring meta fields placed first in schema. so the name can be more accurate.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -220,4 +215,39 @@ object HoodieDatasetBulkInsertHelper
     val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator]
     keyGenerator.getPartitionPathFields.asScala
   }
+
+  /**
+   * We use custom Spark [[Partitioner]] that is aware of the target table's partitioning
+   * so that during inevitable shuffling required for de-duplication, we also assign records
+   * into individual Spark partitions in a way affine with target table's physical partitioning
+   * (ie records from the same table's partition will be co-located in the same Spark's partition)
+   *
+   * This would allow us to
+   * <ul>
+   *   <li>Save on additional shuffling subsequently (by [[BulkInsertPartitioner]])</li>
+   *   <li>Avoid "small files explosion" entailed by random (hash) partitioning stemming
+   *   from the fact that every Spark partition hosts records from many table's partitions
+   *   resulting into every Spark task writing into their own files in these partitions (in
+   *   case no subsequent re-partitioning is performed)
+   *   </li>
+   * <ul>
+   *
+   * For more details check out HUDI-5685
+   */
+  private case class TablePartitioningAwarePartitioner(override val numPartitions: Int,
+                                                       val isPartitioned: Boolean) extends Partitioner {

Review Comment:
   we don't need additional flag to tell partitioned or not. can just check if nonEmpty(partitionPath) ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7825: [HUDI-5685] Fixing deduplication in Bulk Insert row-writing path

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7825:
URL: https://github.com/apache/hudi/pull/7825#issuecomment-1413320825

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "db6d970776ab0dcf2f5b29c54137bd0796202077",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14860",
       "triggerID" : "db6d970776ab0dcf2f5b29c54137bd0796202077",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1cb6c17f6b37b86f122dd22d8791013e553a8493",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14864",
       "triggerID" : "1cb6c17f6b37b86f122dd22d8791013e553a8493",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f526dc51bbd350c7b0c461854dbb65e4070ec531",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14868",
       "triggerID" : "f526dc51bbd350c7b0c461854dbb65e4070ec531",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1cb6c17f6b37b86f122dd22d8791013e553a8493 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14864) 
   * f526dc51bbd350c7b0c461854dbb65e4070ec531 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14868) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #7825: [HUDI-5685] Fixing deduplication in Bulk Insert row-writing path

Posted by "codope (via GitHub)" <gi...@apache.org>.
codope commented on code in PR #7825:
URL: https://github.com/apache/hudi/pull/7825#discussion_r1094183999


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala:
##########
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expressi
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, HoodieDataTypeUtils, HoodieInternalRowUtils, SparkSession}

Review Comment:
   nit: optimize imports (`HoodieInternalRowUtils` is not used).



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -188,7 +188,6 @@ trait ProvidesHoodieConfig extends Logging {
         PRECOMBINE_FIELD.key -> preCombineField,
         PARTITIONPATH_FIELD.key -> partitionFieldsStr,
         PAYLOAD_CLASS_NAME.key -> payloadClassName,
-        HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn),

Review Comment:
   I guess the intent here was to automatically infer the COMBINE_BEFORE_INSERT config. With this change, it not enough for user to just configure precombine field, they also need to enable COMBINE_BEFORE_INSERT if they want to deduplicate. Isn't it? 
   
   Is there validation in code which checks that if COMBINE_BEFORE_INSERT is enabled then precombine field is also configured? If not, it would be better to add as part of configs improvement story.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -220,4 +215,39 @@ object HoodieDatasetBulkInsertHelper
     val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator]
     keyGenerator.getPartitionPathFields.asScala
   }
+
+  /**
+   * We use custom Spark [[Partitioner]] that is aware of the target table's partitioning
+   * so that during inevitable shuffling required for de-duplication, we also assign records
+   * into individual Spark partitions in a way affine with target table's physical partitioning
+   * (ie records from the same table's partition will be co-located in the same Spark's partition)
+   *
+   * This would allow us to
+   * <ul>
+   *   <li>Save on additional shuffling subsequently (by [[BulkInsertPartitioner]])</li>
+   *   <li>Avoid "small files explosion" entailed by random (hash) partitioning stemming
+   *   from the fact that every Spark partition hosts records from many table's partitions
+   *   resulting into every Spark task writing into their own files in these partitions (in
+   *   case no subsequent re-partitioning is performed)
+   *   </li>
+   * <ul>
+   *
+   * For more details check out HUDI-5685
+   */
+  private case class TablePartitioningAwarePartitioner(override val numPartitions: Int,

Review Comment:
   I understand the benefit but have we tested it?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieDataTypeUtils.scala:
##########
@@ -18,10 +18,29 @@
 
 package org.apache.spark.sql
 
+import org.apache.hudi.common.model.HoodieRecord
 import org.apache.spark.sql.types._
 
+import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
+
 object HoodieDataTypeUtils {
 
+  /**
+   * Checks whether provided schema contains Hudi's meta-fields
+   *
+   * NOTE: This method validates presence of just one field [[HoodieRecord.RECORD_KEY_METADATA_FIELD]],
+   * however assuming that meta-fields should either be omitted or specified in full
+   */
+  def hasMetaFields(structType: StructType): Boolean =
+    structType.getFieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD).isDefined
+
+  // TODO scala-doc

Review Comment:
   nit: remove todo?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7825: [HUDI-5685] Fixing deduplication in Bulk Insert row-writing path

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7825:
URL: https://github.com/apache/hudi/pull/7825#discussion_r1094807829


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -220,4 +215,39 @@ object HoodieDatasetBulkInsertHelper
     val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator]
     keyGenerator.getPartitionPathFields.asScala
   }
+
+  /**
+   * We use custom Spark [[Partitioner]] that is aware of the target table's partitioning
+   * so that during inevitable shuffling required for de-duplication, we also assign records
+   * into individual Spark partitions in a way affine with target table's physical partitioning
+   * (ie records from the same table's partition will be co-located in the same Spark's partition)
+   *
+   * This would allow us to
+   * <ul>
+   *   <li>Save on additional shuffling subsequently (by [[BulkInsertPartitioner]])</li>
+   *   <li>Avoid "small files explosion" entailed by random (hash) partitioning stemming
+   *   from the fact that every Spark partition hosts records from many table's partitions
+   *   resulting into every Spark task writing into their own files in these partitions (in
+   *   case no subsequent re-partitioning is performed)
+   *   </li>
+   * <ul>
+   *
+   * For more details check out HUDI-5685
+   */
+  private case class TablePartitioningAwarePartitioner(override val numPartitions: Int,

Review Comment:
   Yes, this been tested in our benchmarking run



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on pull request #7825: [HUDI-5685] Fixing deduplication in Bulk Insert row-writing path

Posted by "nsivabalan (via GitHub)" <gi...@apache.org>.
nsivabalan commented on PR #7825:
URL: https://github.com/apache/hudi/pull/7825#issuecomment-1421711754

   @alexeykudinkin : can you close if this is not valid anymore ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7825: [HUDI-5685] Fixing deduplication in Bulk Insert row-writing path

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7825:
URL: https://github.com/apache/hudi/pull/7825#discussion_r1094809321


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -171,36 +172,30 @@ object HoodieDatasetBulkInsertHelper
     table.getContext.parallelize(writeStatuses.toList.asJava)
   }
 
-  private def dedupeRows(rdd: RDD[InternalRow], schema: StructType, preCombineFieldRef: String, isGlobalIndex: Boolean): RDD[InternalRow] = {
+  private def dedupRows(rdd: RDD[InternalRow], schema: StructType, preCombineFieldRef: String, isPartitioned: Boolean): RDD[InternalRow] = {
     val recordKeyMetaFieldOrd = schema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)
     val partitionPathMetaFieldOrd = schema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
     // NOTE: Pre-combine field could be a nested field
     val preCombineFieldPath = composeNestedFieldPath(schema, preCombineFieldRef)
       .getOrElse(throw new HoodieException(s"Pre-combine field $preCombineFieldRef is missing in $schema"))
 
     rdd.map { row =>
-        val rowKey = if (isGlobalIndex) {
-          row.getString(recordKeyMetaFieldOrd)
+      val partitionPath = if (isPartitioned) row.getUTF8String(partitionPathMetaFieldOrd) else UTF8String.EMPTY_UTF8
+      val recordKey = row.getUTF8String(recordKeyMetaFieldOrd)
+
+      ((partitionPath, recordKey), row)

Review Comment:
   Not needed anymore (we're doing subsequent shuffling which will do the "copying" for us)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7825: [HUDI-5685] Fixing deduplication in Bulk Insert row-writing path

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7825:
URL: https://github.com/apache/hudi/pull/7825#discussion_r1094809321


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -171,36 +172,30 @@ object HoodieDatasetBulkInsertHelper
     table.getContext.parallelize(writeStatuses.toList.asJava)
   }
 
-  private def dedupeRows(rdd: RDD[InternalRow], schema: StructType, preCombineFieldRef: String, isGlobalIndex: Boolean): RDD[InternalRow] = {
+  private def dedupRows(rdd: RDD[InternalRow], schema: StructType, preCombineFieldRef: String, isPartitioned: Boolean): RDD[InternalRow] = {
     val recordKeyMetaFieldOrd = schema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)
     val partitionPathMetaFieldOrd = schema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
     // NOTE: Pre-combine field could be a nested field
     val preCombineFieldPath = composeNestedFieldPath(schema, preCombineFieldRef)
       .getOrElse(throw new HoodieException(s"Pre-combine field $preCombineFieldRef is missing in $schema"))
 
     rdd.map { row =>
-        val rowKey = if (isGlobalIndex) {
-          row.getString(recordKeyMetaFieldOrd)
+      val partitionPath = if (isPartitioned) row.getUTF8String(partitionPathMetaFieldOrd) else UTF8String.EMPTY_UTF8
+      val recordKey = row.getUTF8String(recordKeyMetaFieldOrd)
+
+      ((partitionPath, recordKey), row)

Review Comment:
   Not needed anymore (we're doing subsequent shuffling which is sparing us a need to copy)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7825: [HUDI-5685] Fixing deduplication in Bulk Insert row-writing path

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7825:
URL: https://github.com/apache/hudi/pull/7825#issuecomment-1414102302

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "db6d970776ab0dcf2f5b29c54137bd0796202077",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14860",
       "triggerID" : "db6d970776ab0dcf2f5b29c54137bd0796202077",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1cb6c17f6b37b86f122dd22d8791013e553a8493",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14864",
       "triggerID" : "1cb6c17f6b37b86f122dd22d8791013e553a8493",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f526dc51bbd350c7b0c461854dbb65e4070ec531",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14868",
       "triggerID" : "f526dc51bbd350c7b0c461854dbb65e4070ec531",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a797990c2cb99dbc4838f450064a972f157721ba",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14877",
       "triggerID" : "a797990c2cb99dbc4838f450064a972f157721ba",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f526dc51bbd350c7b0c461854dbb65e4070ec531 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14868) 
   * a797990c2cb99dbc4838f450064a972f157721ba Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14877) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7825: [HUDI-5685] Fixing deduplication in Bulk Insert row-writing path

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7825:
URL: https://github.com/apache/hudi/pull/7825#issuecomment-1414090734

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "db6d970776ab0dcf2f5b29c54137bd0796202077",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14860",
       "triggerID" : "db6d970776ab0dcf2f5b29c54137bd0796202077",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1cb6c17f6b37b86f122dd22d8791013e553a8493",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14864",
       "triggerID" : "1cb6c17f6b37b86f122dd22d8791013e553a8493",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f526dc51bbd350c7b0c461854dbb65e4070ec531",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14868",
       "triggerID" : "f526dc51bbd350c7b0c461854dbb65e4070ec531",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a797990c2cb99dbc4838f450064a972f157721ba",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a797990c2cb99dbc4838f450064a972f157721ba",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f526dc51bbd350c7b0c461854dbb65e4070ec531 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14868) 
   * a797990c2cb99dbc4838f450064a972f157721ba UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7825: [DNM] Fixing deduplication in Bulk Insert row-writing path

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7825:
URL: https://github.com/apache/hudi/pull/7825#issuecomment-1413159606

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "db6d970776ab0dcf2f5b29c54137bd0796202077",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "db6d970776ab0dcf2f5b29c54137bd0796202077",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * db6d970776ab0dcf2f5b29c54137bd0796202077 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7825: [DNM] Fixing deduplication in Bulk Insert row-writing path

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7825:
URL: https://github.com/apache/hudi/pull/7825#issuecomment-1413225019

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "db6d970776ab0dcf2f5b29c54137bd0796202077",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14860",
       "triggerID" : "db6d970776ab0dcf2f5b29c54137bd0796202077",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1cb6c17f6b37b86f122dd22d8791013e553a8493",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1cb6c17f6b37b86f122dd22d8791013e553a8493",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * db6d970776ab0dcf2f5b29c54137bd0796202077 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14860) 
   * 1cb6c17f6b37b86f122dd22d8791013e553a8493 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7825: [DNM] Fixing deduplication in Bulk Insert row-writing path

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7825:
URL: https://github.com/apache/hudi/pull/7825#issuecomment-1413165542

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "db6d970776ab0dcf2f5b29c54137bd0796202077",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14860",
       "triggerID" : "db6d970776ab0dcf2f5b29c54137bd0796202077",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * db6d970776ab0dcf2f5b29c54137bd0796202077 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14860) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7825: [HUDI-5685] Fixing deduplication in Bulk Insert row-writing path

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7825:
URL: https://github.com/apache/hudi/pull/7825#discussion_r1094811296


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieDataTypeUtils.scala:
##########
@@ -18,10 +18,29 @@
 
 package org.apache.spark.sql
 
+import org.apache.hudi.common.model.HoodieRecord
 import org.apache.spark.sql.types._
 
+import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
+
 object HoodieDataTypeUtils {
 
+  /**
+   * Checks whether provided schema contains Hudi's meta-fields
+   *
+   * NOTE: This method validates presence of just one field [[HoodieRecord.RECORD_KEY_METADATA_FIELD]],
+   * however assuming that meta-fields should either be omitted or specified in full
+   */
+  def hasMetaFields(structType: StructType): Boolean =
+    structType.getFieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD).isDefined
+
+  // TODO scala-doc
+  def addMetaFields(schema: StructType): StructType = {

Review Comment:
   This is a relocated method. Keeping the name for compatibility



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #7825: [HUDI-5685] Fixing deduplication in Bulk Insert row-writing path

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan commented on code in PR #7825:
URL: https://github.com/apache/hudi/pull/7825#discussion_r1094213438


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -188,7 +188,6 @@ trait ProvidesHoodieConfig extends Logging {
         PRECOMBINE_FIELD.key -> preCombineField,
         PARTITIONPATH_FIELD.key -> partitionFieldsStr,
         PAYLOAD_CLASS_NAME.key -> payloadClassName,
-        HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn),

Review Comment:
   can you clarify why removing it in this pr though?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7825: [HUDI-5685] Fixing deduplication in Bulk Insert row-writing path

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7825:
URL: https://github.com/apache/hudi/pull/7825#issuecomment-1413311918

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "db6d970776ab0dcf2f5b29c54137bd0796202077",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14860",
       "triggerID" : "db6d970776ab0dcf2f5b29c54137bd0796202077",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1cb6c17f6b37b86f122dd22d8791013e553a8493",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14864",
       "triggerID" : "1cb6c17f6b37b86f122dd22d8791013e553a8493",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f526dc51bbd350c7b0c461854dbb65e4070ec531",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f526dc51bbd350c7b0c461854dbb65e4070ec531",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * db6d970776ab0dcf2f5b29c54137bd0796202077 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14860) 
   * 1cb6c17f6b37b86f122dd22d8791013e553a8493 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14864) 
   * f526dc51bbd350c7b0c461854dbb65e4070ec531 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on a diff in pull request #7825: [HUDI-5685] Fixing deduplication in Bulk Insert row-writing path

Posted by "nsivabalan (via GitHub)" <gi...@apache.org>.
nsivabalan commented on code in PR #7825:
URL: https://github.com/apache/hudi/pull/7825#discussion_r1094947575


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -62,23 +64,18 @@ object HoodieDatasetBulkInsertHelper
                            partitioner: BulkInsertPartitioner[Dataset[Row]],
                            shouldDropPartitionColumns: Boolean): Dataset[Row] = {
     val populateMetaFields = config.populateMetaFields()
-    val schema = df.schema
-
-    val metaFields = Seq(
-      StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType),
-      StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType),
-      StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, StringType),
-      StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, StringType),
-      StructField(HoodieRecord.FILENAME_METADATA_FIELD, StringType))
 
-    val updatedSchema = StructType(metaFields ++ schema.fields)
+    val schema = df.schema
+    val populatedSchema = addMetaFields(schema)
 
     val updatedDF = if (populateMetaFields) {
       val keyGeneratorClassName = config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME,
         "Key-generator class name is required")
-
-      val prependedRdd: RDD[InternalRow] =
-        df.queryExecution.toRdd.mapPartitions { iter =>
+      val sourceRdd = df.queryExecution.toRdd
+      val populatedRdd: RDD[InternalRow] = if (hasMetaFields(schema)) {

Review Comment:
   is this for clustering row writer code path ? 



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieDataTypeUtils.scala:
##########
@@ -18,10 +18,29 @@
 
 package org.apache.spark.sql
 
+import org.apache.hudi.common.model.HoodieRecord
 import org.apache.spark.sql.types._
 
+import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
+
 object HoodieDataTypeUtils {
 
+  /**
+   * Checks whether provided schema contains Hudi's meta-fields
+   *
+   * NOTE: This method validates presence of just one field [[HoodieRecord.RECORD_KEY_METADATA_FIELD]],
+   * however assuming that meta-fields should either be omitted or specified in full
+   */
+  def hasMetaFields(structType: StructType): Boolean =
+    structType.getFieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD).isDefined

Review Comment:
   minor. should we check for partition path as well ? 



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -220,4 +214,41 @@ object HoodieDatasetBulkInsertHelper
     val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator]
     keyGenerator.getPartitionPathFields.asScala
   }
+
+  /**
+   * We use custom Spark [[Partitioner]] that is aware of the target table's partitioning
+   * so that during inevitable shuffling required for de-duplication, we also assign records
+   * into individual Spark partitions in a way affine with target table's physical partitioning
+   * (ie records from the same table's partition will be co-located in the same Spark's partition)
+   *
+   * This would allow us to
+   * <ul>
+   *   <li>Save on additional shuffling subsequently (by [[BulkInsertPartitioner]])</li>
+   *   <li>Avoid "small files explosion" entailed by random (hash) partitioning stemming
+   *   from the fact that every Spark partition hosts records from many table's partitions
+   *   resulting into every Spark task writing into their own files in these partitions (in
+   *   case no subsequent re-partitioning is performed)
+   *   </li>
+   * <ul>
+   *
+   * For more details check out HUDI-5685
+   */
+  private case class TablePartitioningAwarePartitioner(override val numPartitions: Int) extends Partitioner {
+    override def getPartition(key: Any): Int = {
+      key match {
+        case null => 0
+        case (partitionPath, recordKey) =>

Review Comment:
   won't this result in data skews? if one of the hudi partition has lot of data, the respective spark partition will skew the total time for de-dup right? 



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -220,4 +214,41 @@ object HoodieDatasetBulkInsertHelper
     val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator]
     keyGenerator.getPartitionPathFields.asScala
   }
+
+  /**
+   * We use custom Spark [[Partitioner]] that is aware of the target table's partitioning
+   * so that during inevitable shuffling required for de-duplication, we also assign records
+   * into individual Spark partitions in a way affine with target table's physical partitioning
+   * (ie records from the same table's partition will be co-located in the same Spark's partition)
+   *
+   * This would allow us to
+   * <ul>
+   *   <li>Save on additional shuffling subsequently (by [[BulkInsertPartitioner]])</li>
+   *   <li>Avoid "small files explosion" entailed by random (hash) partitioning stemming
+   *   from the fact that every Spark partition hosts records from many table's partitions
+   *   resulting into every Spark task writing into their own files in these partitions (in
+   *   case no subsequent re-partitioning is performed)
+   *   </li>
+   * <ul>
+   *
+   * For more details check out HUDI-5685
+   */
+  private case class TablePartitioningAwarePartitioner(override val numPartitions: Int) extends Partitioner {
+    override def getPartition(key: Any): Int = {
+      key match {
+        case null => 0
+        case (partitionPath, recordKey) =>

Review Comment:
   this was one of the reason why we did not go w/ this to avoid data skews. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7825: [HUDI-5685] Fixing deduplication in Bulk Insert row-writing path

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7825:
URL: https://github.com/apache/hudi/pull/7825#discussion_r1094807426


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -188,7 +188,6 @@ trait ProvidesHoodieConfig extends Logging {
         PRECOMBINE_FIELD.key -> preCombineField,
         PARTITIONPATH_FIELD.key -> partitionFieldsStr,
         PAYLOAD_CLASS_NAME.key -> payloadClassName,
-        HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn),

Review Comment:
   @codope this was just a back-stop to have things running during testing, this is addressed properly in #7813, so this change is reverted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7825: [DNM] Fixing deduplication in Bulk Insert row-writing path

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7825:
URL: https://github.com/apache/hudi/pull/7825#issuecomment-1413230305

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "db6d970776ab0dcf2f5b29c54137bd0796202077",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14860",
       "triggerID" : "db6d970776ab0dcf2f5b29c54137bd0796202077",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1cb6c17f6b37b86f122dd22d8791013e553a8493",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14864",
       "triggerID" : "1cb6c17f6b37b86f122dd22d8791013e553a8493",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * db6d970776ab0dcf2f5b29c54137bd0796202077 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14860) 
   * 1cb6c17f6b37b86f122dd22d8791013e553a8493 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14864) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on pull request #7825: [HUDI-5685] Fixing deduplication in Bulk Insert row-writing path

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on PR #7825:
URL: https://github.com/apache/hudi/pull/7825#issuecomment-1421729790

   This is still a valid scenario when someone uses NONE as partitioner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7825: [HUDI-5685] Fixing deduplication in Bulk Insert row-writing path

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7825:
URL: https://github.com/apache/hudi/pull/7825#issuecomment-1414302742

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "db6d970776ab0dcf2f5b29c54137bd0796202077",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14860",
       "triggerID" : "db6d970776ab0dcf2f5b29c54137bd0796202077",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1cb6c17f6b37b86f122dd22d8791013e553a8493",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14864",
       "triggerID" : "1cb6c17f6b37b86f122dd22d8791013e553a8493",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f526dc51bbd350c7b0c461854dbb65e4070ec531",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14868",
       "triggerID" : "f526dc51bbd350c7b0c461854dbb65e4070ec531",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a797990c2cb99dbc4838f450064a972f157721ba",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14877",
       "triggerID" : "a797990c2cb99dbc4838f450064a972f157721ba",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a797990c2cb99dbc4838f450064a972f157721ba Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14877) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7825: [HUDI-5685] Fixing deduplication in Bulk Insert row-writing path

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7825:
URL: https://github.com/apache/hudi/pull/7825#issuecomment-1413554881

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "db6d970776ab0dcf2f5b29c54137bd0796202077",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14860",
       "triggerID" : "db6d970776ab0dcf2f5b29c54137bd0796202077",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1cb6c17f6b37b86f122dd22d8791013e553a8493",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14864",
       "triggerID" : "1cb6c17f6b37b86f122dd22d8791013e553a8493",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f526dc51bbd350c7b0c461854dbb65e4070ec531",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14868",
       "triggerID" : "f526dc51bbd350c7b0c461854dbb65e4070ec531",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f526dc51bbd350c7b0c461854dbb65e4070ec531 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14868) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7825: [HUDI-5685] Fixing deduplication in Bulk Insert row-writing path

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7825:
URL: https://github.com/apache/hudi/pull/7825#discussion_r1094810752


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -220,4 +215,39 @@ object HoodieDatasetBulkInsertHelper
     val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator]
     keyGenerator.getPartitionPathFields.asScala
   }
+
+  /**
+   * We use custom Spark [[Partitioner]] that is aware of the target table's partitioning
+   * so that during inevitable shuffling required for de-duplication, we also assign records
+   * into individual Spark partitions in a way affine with target table's physical partitioning
+   * (ie records from the same table's partition will be co-located in the same Spark's partition)
+   *
+   * This would allow us to
+   * <ul>
+   *   <li>Save on additional shuffling subsequently (by [[BulkInsertPartitioner]])</li>
+   *   <li>Avoid "small files explosion" entailed by random (hash) partitioning stemming
+   *   from the fact that every Spark partition hosts records from many table's partitions
+   *   resulting into every Spark task writing into their own files in these partitions (in
+   *   case no subsequent re-partitioning is performed)
+   *   </li>
+   * <ul>
+   *
+   * For more details check out HUDI-5685
+   */
+  private case class TablePartitioningAwarePartitioner(override val numPartitions: Int,
+                                                       val isPartitioned: Boolean) extends Partitioner {

Review Comment:
   Good point



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org