You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by la...@apache.org on 2021/02/25 06:33:33 UTC
[hudi] branch master updated: [HUDI-1638] Some improvements to
BucketAssignFunction (#2600)
This is an automated email from the ASF dual-hosted git repository.
lamberken pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 06dc7c7 [HUDI-1638] Some improvements to BucketAssignFunction (#2600)
06dc7c7 is described below
commit 06dc7c7fd8a867a1e1da90f7dc19b0cc2da69bba
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Thu Feb 25 14:33:21 2021 +0800
[HUDI-1638] Some improvements to BucketAssignFunction (#2600)
- The #initializeState executes before #open, thus, the
#checkPartitionsLoaded may see null `initialPartitionsToLoad`
- Only load the existing partitions
---
.../operator/partitioner/BucketAssignFunction.java | 39 ++++++++++++++--------
1 file changed, 25 insertions(+), 14 deletions(-)
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
index 70289b7..8ce7f40 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java
@@ -55,6 +55,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -108,7 +109,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
* All the partition paths when the task starts. It is used to help checking whether all the partitions
* are loaded into the state.
*/
- private transient List<String> initialPartitionsToLoad;
+ private transient Set<String> initialPartitionsToLoad;
/**
* State to book-keep which partition is loaded into the index state {@code indexState}.
@@ -136,15 +137,10 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
new SerializableConfiguration(this.hadoopConf),
new FlinkTaskContextSupplier(getRuntimeContext()));
this.bucketAssigner = new BucketAssigner(context, writeConfig);
- List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(this.context,
- this.conf.getString(FlinkOptions.PATH), false, false, false);
- final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
- final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
- final int taskID = getRuntimeContext().getIndexOfThisSubtask();
- // reference: org.apache.flink.streaming.api.datastream.KeyedStream
- this.initialPartitionsToLoad = allPartitionPaths.stream()
- .filter(partition -> KeyGroupRangeAssignment.assignKeyToParallelOperator(partition, maxParallelism, parallelism) == taskID)
- .collect(Collectors.toList());
+
+ // initialize and check the partitions load state
+ loadInitialPartitions();
+ checkPartitionsLoaded();
}
@Override
@@ -163,9 +159,6 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
MapStateDescriptor<String, Integer> partitionLoadStateDesc =
new MapStateDescriptor<>("partitionLoadState", Types.STRING, Types.INT);
partitionLoadState = context.getKeyedStateStore().getMapState(partitionLoadStateDesc);
- if (context.isRestored()) {
- checkPartitionsLoaded();
- }
}
@SuppressWarnings("unchecked")
@@ -178,7 +171,9 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
final HoodieKey hoodieKey = record.getKey();
final BucketInfo bucketInfo;
final HoodieRecordLocation location;
- if (!allPartitionsLoaded && !partitionLoadState.contains(hoodieKey.getPartitionPath())) {
+ if (!allPartitionsLoaded
+ && initialPartitionsToLoad.contains(hoodieKey.getPartitionPath()) // this is an existing partition
+ && !partitionLoadState.contains(hoodieKey.getPartitionPath())) {
// If the partition records are never loaded, load the records first.
loadRecords(hoodieKey.getPartitionPath());
}
@@ -245,6 +240,21 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
}
/**
+ * Loads the existing partitions for this task.
+ */
+ private void loadInitialPartitions() {
+ List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(this.context,
+ this.conf.getString(FlinkOptions.PATH), false, false, false);
+ final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
+ final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
+ final int taskID = getRuntimeContext().getIndexOfThisSubtask();
+ // reference: org.apache.flink.streaming.api.datastream.KeyedStream
+ this.initialPartitionsToLoad = allPartitionPaths.stream()
+ .filter(partition -> KeyGroupRangeAssignment.assignKeyToParallelOperator(partition, maxParallelism, parallelism) == taskID)
+ .collect(Collectors.toSet());
+ }
+
+ /**
* Checks whether all the partitions of the table are loaded into the state,
* set the flag {@code allPartitionsLoaded} to true if it is.
*/
@@ -271,6 +281,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
public void clearIndexState() {
this.allPartitionsLoaded = false;
this.indexState.clear();
+ loadInitialPartitions();
}
@VisibleForTesting