You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by wa...@apache.org on 2021/06/09 11:30:16 UTC

[hudi] branch master updated: delete duplicate bootstrap function (#3052)

This is an automated email from the ASF dual-hosted git repository.

wangxianghu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 728089a  delete duplicate bootstrap function (#3052)
728089a is described below

commit 728089a88827c3f4e9813f2d2e4d3d4a4a872605
Author: yuzhaojing <32...@users.noreply.github.com>
AuthorDate: Wed Jun 9 19:29:57 2021 +0800

    delete duplicate bootstrap function (#3052)
    
    Co-authored-by: 喻兆靖 <yu...@bilibili.com>
---
 .../hudi/sink/bootstrap/BootstrapFunction.java     | 80 ++++++++++------------
 .../{BootstrapRecord.java => IndexRecord.java}     |  6 +-
 .../bootstrap/aggregate/BootstrapAccumulator.java  |  2 +-
 ...strapAggFunc.java => BootstrapAggFunction.java} |  7 +-
 .../sink/partitioner/BucketAssignFunction.java     | 10 +--
 .../org/apache/hudi/table/HoodieTableSink.java     |  6 +-
 .../sink/utils/StreamWriteFunctionWrapper.java     |  4 +-
 7 files changed, 53 insertions(+), 62 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
index 5f81559..503a5bf 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java
@@ -18,22 +18,6 @@
 
 package org.apache.hudi.sink.bootstrap;
 
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.state.CheckpointListener;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.config.SerializableConfiguration;
@@ -47,11 +31,26 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.HoodieIndexUtils;
-import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunc;
+import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.util.StreamerUtil;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.scala.typeutils.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,18 +60,16 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
 /**
- * The function to load index from exists hoodieTable.
+ * The function to load index from existing hoodieTable.
  *
- * <p>Each subtask in bootstrapFunction triggers the bootstrap index with the first element,
- * Received record cannot be sent until the index is all sent.
+ * <p>Each subtask of the function triggers the index bootstrap when the first element came in,
+ * the record cannot be sent until all the index records have been sent.
  *
  * <p>The output records should then shuffle by the recordKey and thus do scalable write.
- *
- * @see BootstrapFunction
  */
 public class BootstrapFunction<I, O extends HoodieRecord>
-      extends ProcessFunction<I, O>
-      implements CheckpointedFunction, CheckpointListener {
+    extends ProcessFunction<I, O>
+    implements CheckpointedFunction, CheckpointListener {
 
   private static final Logger LOG = LoggerFactory.getLogger(BootstrapFunction.class);
 
@@ -95,12 +92,8 @@ public class BootstrapFunction<I, O extends HoodieRecord>
 
   @Override
   public void initializeState(FunctionInitializationContext context) throws Exception {
-    this.bootstrapState = context.getOperatorStateStore().getListState(
-       new ListStateDescriptor<>(
-           "bootstrap-state",
-           TypeInformation.of(new TypeHint<Boolean>() {})
-       )
-    );
+    this.bootstrapState = context.getOperatorStateStore()
+        .getListState(new ListStateDescriptor<>("bootstrap-state", Types.BOOLEAN()));
 
     if (context.isRestored()) {
       LOG.info("Restoring state for the {}.", getClass().getSimpleName());
@@ -123,8 +116,9 @@ public class BootstrapFunction<I, O extends HoodieRecord>
   @SuppressWarnings("unchecked")
   public void processElement(I value, Context ctx, Collector<O> out) throws IOException {
     if (!alreadyBootstrap) {
-      LOG.info("Start loading records in table {} into the index state, taskId = {}", conf.getString(FlinkOptions.PATH), getRuntimeContext().getIndexOfThisSubtask());
       String basePath = hoodieTable.getMetaClient().getBasePath();
+      int taskID = getRuntimeContext().getIndexOfThisSubtask();
+      LOG.info("Start loading records in table {} into the index state, taskId = {}", basePath, taskID);
       for (String partitionPath : FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, hadoopConf), basePath)) {
         if (pattern.matcher(partitionPath).matches()) {
           loadRecords(partitionPath, out);
@@ -132,30 +126,30 @@ public class BootstrapFunction<I, O extends HoodieRecord>
       }
 
       // wait for others bootstrap task send bootstrap complete.
-      updateAndWaiting();
+      waitForBootstrapReady(taskID);
 
       alreadyBootstrap = true;
-      LOG.info("Finish send index to BucketAssign, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask());
+      LOG.info("Finish sending index records, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask());
     }
 
-    // send data to next operator
+    // send the trigger record
     out.collect((O) value);
   }
 
   /**
-   * Wait for other bootstrap task send bootstrap complete.
+   * Wait for other bootstrap tasks to finish the index bootstrap.
    */
-  private void updateAndWaiting() {
+  private void waitForBootstrapReady(int taskID) {
     int taskNum = getRuntimeContext().getNumberOfParallelSubtasks();
     int readyTaskNum = 1;
     while (taskNum != readyTaskNum) {
       try {
-        readyTaskNum = aggregateManager.updateGlobalAggregate(BootstrapAggFunc.NAME, getRuntimeContext().getIndexOfThisSubtask(), new BootstrapAggFunc());
-        LOG.info("Waiting for others bootstrap task complete, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask());
+        readyTaskNum = aggregateManager.updateGlobalAggregate(BootstrapAggFunction.NAME, taskID, new BootstrapAggFunction());
+        LOG.info("Waiting for other bootstrap tasks to complete, taskId = {}.", taskID);
 
         TimeUnit.SECONDS.sleep(5);
       } catch (Exception e) {
-        LOG.warn("update global aggregate error", e);
+        LOG.warn("Update global task bootstrap summary error", e);
       }
     }
   }
@@ -178,7 +172,7 @@ public class BootstrapFunction<I, O extends HoodieRecord>
     long start = System.currentTimeMillis();
     BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
     List<HoodieBaseFile> latestBaseFiles =
-            HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, this.hoodieTable);
+        HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, this.hoodieTable);
     LOG.info("All baseFile in partition {} size = {}", partitionPath, latestBaseFiles.size());
 
     final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
@@ -193,20 +187,20 @@ public class BootstrapFunction<I, O extends HoodieRecord>
         final List<HoodieKey> hoodieKeys;
         try {
           hoodieKeys =
-                    fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new Path(baseFile.getPath()));
+              fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new Path(baseFile.getPath()));
         } catch (Exception e) {
           throw new HoodieException(String.format("Error when loading record keys from file: %s", baseFile), e);
         }
 
         for (HoodieKey hoodieKey : hoodieKeys) {
-          out.collect((O) new BootstrapRecord(generateHoodieRecord(hoodieKey, baseFile)));
+          out.collect((O) new IndexRecord(generateHoodieRecord(hoodieKey, baseFile)));
         }
       }
     }
 
     long cost = System.currentTimeMillis() - start;
     LOG.info("Task [{}}:{}}] finish loading the index under partition {} and sending them to downstream, time cost: {} milliseconds.",
-            this.getClass().getSimpleName(), taskID, partitionPath, cost);
+        this.getClass().getSimpleName(), taskID, partitionPath, cost);
   }
 
   @SuppressWarnings("unchecked")
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapRecord.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/IndexRecord.java
similarity index 84%
rename from hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapRecord.java
rename to hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/IndexRecord.java
index 025d844..2fe83b7 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapRecord.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/IndexRecord.java
@@ -22,12 +22,12 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 
 /**
- * An record to mark HoodieRecord or IndexRecord.
+ * The index record.
  */
-public class BootstrapRecord<T extends HoodieRecordPayload> extends HoodieRecord<T> {
+public class IndexRecord<T extends HoodieRecordPayload> extends HoodieRecord<T> {
   private static final long serialVersionUID = 1L;
 
-  public BootstrapRecord(HoodieRecord<T> record) {
+  public IndexRecord(HoodieRecord<T> record) {
     super(record);
   }
 }
\ No newline at end of file
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java
index 80067f0..14630a1 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java
@@ -23,7 +23,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 /**
- * Aggregate accumulator.
+ * Bootstrap ready task id accumulator.
  */
 public class BootstrapAccumulator implements Serializable {
   private static final long serialVersionUID = 1L;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunc.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunction.java
similarity index 86%
rename from hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunc.java
rename to hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunction.java
index 2233e84..075de6d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunc.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunction.java
@@ -21,10 +21,11 @@ package org.apache.hudi.sink.bootstrap.aggregate;
 import org.apache.flink.api.common.functions.AggregateFunction;
 
 /**
- * Aggregate Function that accumulates the loaded task number of function {@link org.apache.hudi.sink.bootstrap.BootstrapFunction}.
+ * Aggregate Function that accumulates the loaded task number of
+ * function {@link org.apache.hudi.sink.bootstrap.BootstrapFunction}.
  */
-public class BootstrapAggFunc implements AggregateFunction<Integer, BootstrapAccumulator, Integer> {
-  public static final String NAME = BootstrapAggFunc.class.getSimpleName();
+public class BootstrapAggFunction implements AggregateFunction<Integer, BootstrapAccumulator, Integer> {
+  public static final String NAME = BootstrapAggFunction.class.getSimpleName();
 
   @Override
   public BootstrapAccumulator createAccumulator() {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 00a8274..75a3454 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -30,7 +30,7 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.bootstrap.BootstrapRecord;
+import org.apache.hudi.sink.bootstrap.IndexRecord;
 import org.apache.hudi.sink.utils.PayloadCreation;
 import org.apache.hudi.table.action.commit.BucketInfo;
 import org.apache.hudi.util.StreamerUtil;
@@ -151,10 +151,10 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
 
   @Override
   public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
-    if (value instanceof BootstrapRecord) {
-      BootstrapRecord bootstrapRecord = (BootstrapRecord) value;
-      this.context.setCurrentKey(bootstrapRecord.getRecordKey());
-      this.indexState.update((HoodieRecordGlobalLocation) bootstrapRecord.getCurrentLocation());
+    if (value instanceof IndexRecord) {
+      IndexRecord<?> indexRecord = (IndexRecord<?>) value;
+      this.context.setCurrentKey(indexRecord.getRecordKey());
+      this.indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation());
     } else {
       processRecord((HoodieRecord<?>) value, out);
     }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 0d8f7f2..4fbcbb5 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -24,7 +24,6 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.sink.CleanFunction;
 import org.apache.hudi.sink.StreamWriteOperatorFactory;
 import org.apache.hudi.sink.bootstrap.BootstrapFunction;
-import org.apache.hudi.sink.bootstrap.BootstrapRecord;
 import org.apache.hudi.sink.compact.CompactFunction;
 import org.apache.hudi.sink.compact.CompactionCommitEvent;
 import org.apache.hudi.sink.compact.CompactionCommitSink;
@@ -88,11 +87,8 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
       }
 
       DataStream<Object> pipeline = hoodieDataStream
-           .transform("index_bootstrap",
-                  TypeInformation.of(BootstrapRecord.class),
-                  new ProcessOperator<>(new BootstrapFunction<>(conf)))
           // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
-          .keyBy(BootstrapRecord::getRecordKey)
+          .keyBy(HoodieRecord::getRecordKey)
           .transform(
               "bucket_assigner",
               TypeInformation.of(HoodieRecord.class),
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 705780c..9ba8b8e 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -24,7 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.bootstrap.BootstrapFunction;
-import org.apache.hudi.sink.bootstrap.BootstrapRecord;
+import org.apache.hudi.sink.bootstrap.IndexRecord;
 import org.apache.hudi.sink.StreamWriteFunction;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
 import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
@@ -157,7 +157,7 @@ public class StreamWriteFunctionWrapper<I> {
       Collector<HoodieRecord<?>> bootstrapCollector = new Collector<HoodieRecord<?>>() {
         @Override
         public void collect(HoodieRecord<?> record) {
-          if (record instanceof BootstrapRecord) {
+          if (record instanceof IndexRecord) {
             bootstrapRecords.add(record);
           }
         }