You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/01/22 01:18:08 UTC

[hudi] branch master updated: [HUDI-1511] InstantGenerateOperator support multiple parallelism (#2434)

This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b64d22e  [HUDI-1511] InstantGenerateOperator support multiple parallelism (#2434)
b64d22e is described below

commit b64d22e0478b967c83be22509beaaf8c5f114e19
Author: luokey <85...@qq.com>
AuthorDate: Fri Jan 22 09:17:50 2021 +0800

    [HUDI-1511] InstantGenerateOperator support multiple parallelism (#2434)
---
 .../hudi/operator/InstantGenerateOperator.java     | 170 +++++++++++++++------
 1 file changed, 123 insertions(+), 47 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
index 4e32ec7..7879243 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
@@ -38,10 +38,13 @@ import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.PathFilter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,9 +52,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Operator helps to generate globally unique instant, it must be executed in one parallelism. Before generate a new
@@ -71,16 +74,20 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
   private String latestInstant = "";
   private List<String> latestInstantList = new ArrayList<>(1);
   private transient ListState<String> latestInstantState;
-  private List<StreamRecord> bufferedRecords = new LinkedList();
-  private transient ListState<StreamRecord> recordsState;
   private Integer retryTimes;
   private Integer retryInterval;
+  private static final String DELIMITER = "_";
+  private static final String INSTANT_MARKER_FOLDER_NAME = ".instant_marker";
+  private transient boolean isMain = false;
+  private transient AtomicLong recordCounter = new AtomicLong(0);
+  private StreamingRuntimeContext runtimeContext;
+  private int indexOfThisSubtask;
 
   @Override
   public void processElement(StreamRecord<HoodieRecord> streamRecord) throws Exception {
     if (streamRecord.getValue() != null) {
-      bufferedRecords.add(streamRecord);
       output.collect(streamRecord);
+      recordCounter.incrementAndGet();
     }
   }
 
@@ -88,7 +95,7 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
   public void open() throws Exception {
     super.open();
     // get configs from runtimeContext
-    cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+    cfg = (HoodieFlinkStreamer.Config) runtimeContext.getExecutionConfig().getGlobalJobParameters();
 
     // retry times
     retryTimes = Integer.valueOf(cfg.blockRetryTime);
@@ -102,65 +109,78 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
     // Hadoop FileSystem
     fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
 
-    TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+    if (isMain) {
+      TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(runtimeContext);
 
-    // writeClient
-    writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+      // writeClient
+      writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
 
-    // init table, create it if not exists.
-    initTable();
+      // init table, create it if not exists.
+      initTable();
+
+      // create instant marker directory
+      createInstantMarkerDir();
+    }
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
     super.prepareSnapshotPreBarrier(checkpointId);
-    // check whether the last instant is completed, if not, wait 10s and then throws an exception
-    if (!StringUtils.isNullOrEmpty(latestInstant)) {
-      doCheck();
-      // last instant completed, set it empty
-      latestInstant = "";
-    }
-
-    // no data no new instant
-    if (!bufferedRecords.isEmpty()) {
-      latestInstant = startNewInstant(checkpointId);
+    String instantMarkerFileName = String.format("%d%s%d%s%d", indexOfThisSubtask, DELIMITER, checkpointId, DELIMITER, recordCounter.get());
+    Path path = new Path(new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME), instantMarkerFileName);
+    // mk marker file by each subtask
+    fs.create(path, true);
+    LOG.info("Subtask [{}] at checkpoint [{}] created marker file [{}]", indexOfThisSubtask, checkpointId, instantMarkerFileName);
+    if (isMain) {
+      // check whether the last instant is completed, if not, wait 10s and then throws an exception
+      if (!StringUtils.isNullOrEmpty(latestInstant)) {
+        doCheck();
+        // last instant completed, set it empty
+        latestInstant = "";
+      }
+      boolean receivedDataInCurrentCP = checkReceivedData(checkpointId);
+      // no data no new instant
+      if (receivedDataInCurrentCP) {
+        latestInstant = startNewInstant(checkpointId);
+      }
     }
   }
 
   @Override
   public void initializeState(StateInitializationContext context) throws Exception {
-    // instantState
-    ListStateDescriptor<String> latestInstantStateDescriptor = new ListStateDescriptor<String>("latestInstant", String.class);
-    latestInstantState = context.getOperatorStateStore().getListState(latestInstantStateDescriptor);
-
-    // recordState
-    ListStateDescriptor<StreamRecord> recordsStateDescriptor = new ListStateDescriptor<StreamRecord>("recordsState", StreamRecord.class);
-    recordsState = context.getOperatorStateStore().getListState(recordsStateDescriptor);
-
-    if (context.isRestored()) {
-      Iterator<String> latestInstantIterator = latestInstantState.get().iterator();
-      latestInstantIterator.forEachRemaining(x -> latestInstant = x);
-      LOG.info("InstantGenerateOperator initializeState get latestInstant [{}]", latestInstant);
-
-      Iterator<StreamRecord> recordIterator = recordsState.get().iterator();
-      bufferedRecords.clear();
-      recordIterator.forEachRemaining(x -> bufferedRecords.add(x));
+    runtimeContext = getRuntimeContext();
+    indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
+    isMain = indexOfThisSubtask == 0;
+
+    if (isMain) {
+      // instantState
+      ListStateDescriptor<String> latestInstantStateDescriptor = new ListStateDescriptor<>("latestInstant", String.class);
+      latestInstantState = context.getOperatorStateStore().getListState(latestInstantStateDescriptor);
+
+      if (context.isRestored()) {
+        Iterator<String> latestInstantIterator = latestInstantState.get().iterator();
+        latestInstantIterator.forEachRemaining(x -> latestInstant = x);
+        LOG.info("Restoring the latest instant [{}] from the state", latestInstant);
+      }
     }
   }
 
   @Override
   public void snapshotState(StateSnapshotContext functionSnapshotContext) throws Exception {
-    if (latestInstantList.isEmpty()) {
-      latestInstantList.add(latestInstant);
+    long checkpointId = functionSnapshotContext.getCheckpointId();
+    long recordSize = recordCounter.get();
+    if (isMain) {
+      LOG.info("Update latest instant [{}] records size [{}] checkpointId [{}]", latestInstant, recordSize, checkpointId);
+      if (latestInstantList.isEmpty()) {
+        latestInstantList.add(latestInstant);
+      } else {
+        latestInstantList.set(0, latestInstant);
+      }
+      latestInstantState.update(latestInstantList);
     } else {
-      latestInstantList.set(0, latestInstant);
+      LOG.info("Task instance {} received {} records in checkpoint [{}]", indexOfThisSubtask, recordSize, checkpointId);
     }
-    latestInstantState.update(latestInstantList);
-    LOG.info("Update latest instant [{}]", latestInstant);
-
-    recordsState.update(bufferedRecords);
-    LOG.info("Update records state size = [{}]", bufferedRecords.size());
-    bufferedRecords.clear();
+    recordCounter.set(0);
   }
 
   /**
@@ -185,10 +205,10 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
     int tryTimes = 0;
     while (tryTimes < retryTimes) {
       tryTimes++;
-      StringBuffer sb = new StringBuffer();
+      StringBuilder sb = new StringBuilder();
       if (rollbackPendingCommits.contains(latestInstant)) {
         rollbackPendingCommits.forEach(x -> sb.append(x).append(","));
-        LOG.warn("Latest transaction [{}] is not completed! unCompleted transaction:[{}],try times [{}]", latestInstant, sb.toString(), tryTimes);
+        LOG.warn("Latest transaction [{}] is not completed! unCompleted transaction:[{}],try times [{}]", latestInstant, sb, tryTimes);
         TimeUnit.SECONDS.sleep(retryInterval);
         rollbackPendingCommits = writeClient.getInflightsAndRequestedInstants(commitType);
       } else {
@@ -222,4 +242,60 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
       fs.close();
     }
   }
+
+  private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
+    int numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks();
+    FileStatus[] fileStatuses;
+    Path instantMarkerPath = new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME);
+    // waiting all subtask create marker file ready
+    while (true) {
+      Thread.sleep(500L);
+      fileStatuses = fs.listStatus(instantMarkerPath, new PathFilter() {
+        @Override
+        public boolean accept(Path pathname) {
+          return pathname.getName().contains(String.format("%s%d%s", DELIMITER, checkpointId, DELIMITER));
+        }
+      });
+
+      // is ready
+      if (fileStatuses != null && fileStatuses.length == numberOfParallelSubtasks) {
+        break;
+      }
+    }
+
+    boolean receivedData = false;
+    // judge whether has data in this checkpoint and delete maker file.
+    for (FileStatus fileStatus : fileStatuses) {
+      Path path = fileStatus.getPath();
+      String name = path.getName();
+      // has data
+      if (Long.parseLong(name.split(DELIMITER)[2]) > 0) {
+        receivedData = true;
+        break;
+      }
+    }
+
+    // delete all marker file
+    cleanMarkerDir(instantMarkerPath);
+
+    return receivedData;
+  }
+
+  private void createInstantMarkerDir() throws IOException {
+    // Always create instantMarkerFolder which is needed for InstantGenerateOperator
+    final Path instantMarkerFolder = new Path(new Path(cfg.targetBasePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME), INSTANT_MARKER_FOLDER_NAME);
+    if (!fs.exists(instantMarkerFolder)) {
+      fs.mkdirs(instantMarkerFolder);
+    } else {
+      // Clean marker dir.
+      cleanMarkerDir(instantMarkerFolder);
+    }
+  }
+
+  private void cleanMarkerDir(Path instantMarkerFolder) throws IOException {
+    FileStatus[] fileStatuses = fs.listStatus(instantMarkerFolder);
+    for (FileStatus fileStatus : fileStatuses) {
+      fs.delete(fileStatus.getPath(), true);
+    }
+  }
 }