You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by la...@apache.org on 2021/02/25 06:33:33 UTC

[hudi] branch master updated: [HUDI-1638] Some improvements to BucketAssignFunction (#2600)

This is an automated email from the ASF dual-hosted git repository.

lamberken pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 06dc7c7  [HUDI-1638] Some improvements to BucketAssignFunction (#2600)
06dc7c7 is described below

commit 06dc7c7fd8a867a1e1da90f7dc19b0cc2da69bba
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Thu Feb 25 14:33:21 2021 +0800

    [HUDI-1638] Some improvements to BucketAssignFunction (#2600)
    
    - The #initializeState executes before #open, thus, the
      #checkPartitionsLoaded may see null `initialPartitionsToLoad`
      - Only load the existing partitions
---
 .../operator/partitioner/BucketAssignFunction.java | 39 ++++++++++++++--------
 1 file changed, 25 insertions(+), 14 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
index 70289b7..8ce7f40 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
@@ -55,6 +55,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -108,7 +109,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
    * All the partition paths when the task starts. It is used to help checking whether all the partitions
    * are loaded into the state.
    */
-  private transient List<String> initialPartitionsToLoad;
+  private transient Set<String> initialPartitionsToLoad;
 
   /**
    * State to book-keep which partition is loaded into the index state {@code indexState}.
@@ -136,15 +137,10 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
         new SerializableConfiguration(this.hadoopConf),
         new FlinkTaskContextSupplier(getRuntimeContext()));
     this.bucketAssigner = new BucketAssigner(context, writeConfig);
-    List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(this.context,
-        this.conf.getString(FlinkOptions.PATH), false, false, false);
-    final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
-    final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
-    final int taskID = getRuntimeContext().getIndexOfThisSubtask();
-    // reference: org.apache.flink.streaming.api.datastream.KeyedStream
-    this.initialPartitionsToLoad = allPartitionPaths.stream()
-        .filter(partition -> KeyGroupRangeAssignment.assignKeyToParallelOperator(partition, maxParallelism, parallelism) == taskID)
-        .collect(Collectors.toList());
+
+    // initialize and check the partitions load state
+    loadInitialPartitions();
+    checkPartitionsLoaded();
   }
 
   @Override
@@ -163,9 +159,6 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
     MapStateDescriptor<String, Integer> partitionLoadStateDesc =
         new MapStateDescriptor<>("partitionLoadState", Types.STRING, Types.INT);
     partitionLoadState = context.getKeyedStateStore().getMapState(partitionLoadStateDesc);
-    if (context.isRestored()) {
-      checkPartitionsLoaded();
-    }
   }
 
   @SuppressWarnings("unchecked")
@@ -178,7 +171,9 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
     final HoodieKey hoodieKey = record.getKey();
     final BucketInfo bucketInfo;
     final HoodieRecordLocation location;
-    if (!allPartitionsLoaded && !partitionLoadState.contains(hoodieKey.getPartitionPath())) {
+    if (!allPartitionsLoaded
+        && initialPartitionsToLoad.contains(hoodieKey.getPartitionPath()) // this is an existing partition
+        && !partitionLoadState.contains(hoodieKey.getPartitionPath())) {
       // If the partition records are never loaded, load the records first.
       loadRecords(hoodieKey.getPartitionPath());
     }
@@ -245,6 +240,21 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
   }
 
   /**
+   * Loads the existing partitions for this task.
+   */
+  private void loadInitialPartitions() {
+    List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(this.context,
+        this.conf.getString(FlinkOptions.PATH), false, false, false);
+    final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
+    final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
+    final int taskID = getRuntimeContext().getIndexOfThisSubtask();
+    // reference: org.apache.flink.streaming.api.datastream.KeyedStream
+    this.initialPartitionsToLoad = allPartitionPaths.stream()
+        .filter(partition -> KeyGroupRangeAssignment.assignKeyToParallelOperator(partition, maxParallelism, parallelism) == taskID)
+        .collect(Collectors.toSet());
+  }
+
+  /**
    * Checks whether all the partitions of the table are loaded into the state,
    * set the flag {@code allPartitionsLoaded} to true if it is.
    */
@@ -271,6 +281,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
   public void clearIndexState() {
     this.allPartitionsLoaded = false;
     this.indexState.clear();
+    loadInitialPartitions();
   }
 
   @VisibleForTesting