You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/02/25 08:05:14 UTC

[GitHub] [hudi] yihua commented on a change in pull request #4441: [HUDI-3085] improve bulk insert partitioner abstraction

yihua commented on a change in pull request #4441:
URL: https://github.com/apache/hudi/pull/4441#discussion_r814506957



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
##########
@@ -18,24 +18,64 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.io.WriteHandleFactory;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
 /**
  * Repartition input records into at least expected number of output spark partitions. It should give below guarantees -
  * Output spark partition will have records from only one hoodie partition. - Average records per output spark
  * partitions should be almost equal to (#inputRecords / #outputSparkPartitions) to avoid possible skews.
  */
-public interface BulkInsertPartitioner<I> {
+public abstract class BulkInsertPartitioner<I> implements Serializable {
+
+  private WriteHandleFactory defaultWriteHandleFactory;
+  private List<String> fileIdPfx;
 
   /**
-   * Repartitions the input records into at least expected number of output spark partitions.
+   * Repartitions the input records into at least expected number of output spark partitions,
+   * and generates fileIdPfx for each partition.
    *
    * @param records               Input Hoodie records
    * @param outputSparkPartitions Expected number of output partitions
    * @return
    */
-  I repartitionRecords(I records, int outputSparkPartitions);
+  public abstract I repartitionRecords(I records, int outputSparkPartitions);
 
   /**
    * @return {@code true} if the records within a partition are sorted; {@code false} otherwise.
    */
-  boolean arePartitionRecordsSorted();
+  public abstract boolean arePartitionRecordsSorted();
+
+  public List<String> getFileIdPfx() {
+    return fileIdPfx;
+  }
+
+  public void setDefaultWriteHandleFactory(WriteHandleFactory defaultWriteHandleFactory) {
+    this.defaultWriteHandleFactory = defaultWriteHandleFactory;
+  }
+
+  /**
+   * Return write handle factory for the given partition.
+   * By default, return the pre-assigned write handle factory for all partitions
+   * @param partition data partition
+   * @return
+   */
+  public WriteHandleFactory getWriteHandleFactory(int partition) {
+    return defaultWriteHandleFactory;
+  }
+
+  /**
+   * Initialize a list of file id prefix randomly.
+   * In most cases, bulk_insert put all incoming records to randomly generated file groups (i.e., the current default implementation).

Review comment:
       nit: "randomly generated file groups"
   -> "randomly generated file group ID prefixes"

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
##########
@@ -18,24 +18,64 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.io.WriteHandleFactory;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
 /**
  * Repartition input records into at least expected number of output spark partitions. It should give below guarantees -
  * Output spark partition will have records from only one hoodie partition. - Average records per output spark
  * partitions should be almost equal to (#inputRecords / #outputSparkPartitions) to avoid possible skews.
  */
-public interface BulkInsertPartitioner<I> {
+public abstract class BulkInsertPartitioner<I> implements Serializable {

Review comment:
       This interface is public and users may implement their own bulk insert partitioner as a plugin. The change from interface to abstract class is not backward compatible. Could you keep it as an interface and use `default` methods for new logic?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
##########
@@ -18,24 +18,64 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.io.WriteHandleFactory;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
 /**
  * Repartition input records into at least expected number of output spark partitions. It should give below guarantees -
  * Output spark partition will have records from only one hoodie partition. - Average records per output spark
  * partitions should be almost equal to (#inputRecords / #outputSparkPartitions) to avoid possible skews.
  */
-public interface BulkInsertPartitioner<I> {
+public abstract class BulkInsertPartitioner<I> implements Serializable {
+
+  private WriteHandleFactory defaultWriteHandleFactory;
+  private List<String> fileIdPfx;
 
   /**
-   * Repartitions the input records into at least expected number of output spark partitions.
+   * Repartitions the input records into at least expected number of output spark partitions,
+   * and generates fileIdPfx for each partition.
    *
    * @param records               Input Hoodie records
    * @param outputSparkPartitions Expected number of output partitions
    * @return
    */
-  I repartitionRecords(I records, int outputSparkPartitions);
+  public abstract I repartitionRecords(I records, int outputSparkPartitions);
 
   /**
    * @return {@code true} if the records within a partition are sorted; {@code false} otherwise.
    */
-  boolean arePartitionRecordsSorted();
+  public abstract boolean arePartitionRecordsSorted();
+
+  public List<String> getFileIdPfx() {
+    return fileIdPfx;
+  }
+
+  public void setDefaultWriteHandleFactory(WriteHandleFactory defaultWriteHandleFactory) {

Review comment:
       Should `setDefaultWriteHandleFactory()` functionality be implemented through the constructor with the defaultWriteHandleFactory passed in?  e.g., 
   ```
   public GlobalSortPartitioner(WriteHandleFactory defaultWriteHandleFactory);
   ```
   

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java
##########
@@ -41,27 +41,24 @@
   private boolean areRecordsSorted;
   private HoodieWriteConfig config;
   private HoodieTable hoodieTable;
-  private List<String> fileIDPrefixes;
   private boolean useWriterSchema;
-  private WriteHandleFactory writeHandleFactory;
+  private BulkInsertPartitioner partitioner;
 
   public BulkInsertMapFunction(String instantTime, boolean areRecordsSorted,
                                HoodieWriteConfig config, HoodieTable hoodieTable,
-                               List<String> fileIDPrefixes, boolean useWriterSchema,
-                               WriteHandleFactory writeHandleFactory) {
+                               boolean useWriterSchema, BulkInsertPartitioner partitioner) {
     this.instantTime = instantTime;
     this.areRecordsSorted = areRecordsSorted;
     this.config = config;
     this.hoodieTable = hoodieTable;
-    this.fileIDPrefixes = fileIDPrefixes;
     this.useWriterSchema = useWriterSchema;
-    this.writeHandleFactory = writeHandleFactory;
+    this.partitioner = partitioner;
   }
 
   @Override
   public Iterator<List<WriteStatus>> call(Integer partition, Iterator<HoodieRecord<T>> recordItr) {
     return new SparkLazyInsertIterable<>(recordItr, areRecordsSorted, config, instantTime, hoodieTable,
-        fileIDPrefixes.get(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema,
-        writeHandleFactory);
+        (String)partitioner.getFileIdPfx().get(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema,

Review comment:
       It's better to apply the partition ID -> file ID here if the partitioner just stores the function.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
##########
@@ -18,24 +18,64 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.io.WriteHandleFactory;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
 /**
  * Repartition input records into at least expected number of output spark partitions. It should give below guarantees -
  * Output spark partition will have records from only one hoodie partition. - Average records per output spark
  * partitions should be almost equal to (#inputRecords / #outputSparkPartitions) to avoid possible skews.
  */
-public interface BulkInsertPartitioner<I> {
+public abstract class BulkInsertPartitioner<I> implements Serializable {
+
+  private WriteHandleFactory defaultWriteHandleFactory;
+  private List<String> fileIdPfx;

Review comment:
       nit: rename to `fileIdPrefixList`

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
##########
@@ -18,24 +18,64 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.io.WriteHandleFactory;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
 /**
  * Repartition input records into at least expected number of output spark partitions. It should give below guarantees -
  * Output spark partition will have records from only one hoodie partition. - Average records per output spark
  * partitions should be almost equal to (#inputRecords / #outputSparkPartitions) to avoid possible skews.
  */
-public interface BulkInsertPartitioner<I> {
+public abstract class BulkInsertPartitioner<I> implements Serializable {
+
+  private WriteHandleFactory defaultWriteHandleFactory;
+  private List<String> fileIdPfx;

Review comment:
       After looking at this PR as a whole, I'm thinking that it may be better to store a generating function of partitionId -> fileIdPrefix, and partitionId -> writeHandleFactory and have those functions passed in from the constructor.
   
   Do you have any PoC of BulkInsertPartitioner implementation that provides partition-specific file ID and write handle factory?  I'd like to understand how these are coupled with the repartition logic and how the interface design can accommodate the use case.

##########
File path: hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java
##########
@@ -64,7 +65,11 @@ public JavaSortAndSizeExecutionStrategy(HoodieTable table,
     props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString());
     props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
     HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build();
+
+    BulkInsertPartitioner partitioner = getPartitioner(strategyParams, schema);
+    partitioner.setDefaultWriteHandleFactory(new CreateHandleFactory(preserveHoodieMetadata));

Review comment:
       This can be achieved through constructor.

##########
File path: hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
##########
@@ -50,6 +50,7 @@ public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema, boo
   @Override
   public List<HoodieRecord<T>> repartitionRecords(
       List<HoodieRecord<T>> records, int outputSparkPartitions) {
+    generateFileIdPfx(outputSparkPartitions);

Review comment:
       Wondering if this can be achieved by a function (`func`) passed to the constructor and the logic of sth like `IntStream.range(0, outputSparkPartitions).mapToObj(i -> func.apply(i))`?

##########
File path: hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
##########
@@ -117,7 +116,7 @@ public static JavaBulkInsertHelper newInstance() {
     new JavaLazyInsertIterable<>(repartitionedRecords.iterator(), true,
         config, instantTime, table,
         fileIdPrefixProvider.createFilePrefix(""), table.getTaskContextSupplier(),
-        new CreateHandleFactory<>()).forEachRemaining(writeStatuses::addAll);
+        partitioner.getWriteHandleFactory(0)).forEachRemaining(writeStatuses::addAll);

Review comment:
       This looks hacky




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org