You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/01/22 01:18:08 UTC
[hudi] branch master updated: [HUDI-1511] InstantGenerateOperator
support multiple parallelism (#2434)
This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b64d22e [HUDI-1511] InstantGenerateOperator support multiple parallelism (#2434)
b64d22e is described below
commit b64d22e0478b967c83be22509beaaf8c5f114e19
Author: luokey <85...@qq.com>
AuthorDate: Fri Jan 22 09:17:50 2021 +0800
[HUDI-1511] InstantGenerateOperator support multiple parallelism (#2434)
---
.../hudi/operator/InstantGenerateOperator.java | 170 +++++++++++++++------
1 file changed, 123 insertions(+), 47 deletions(-)
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
index 4e32ec7..7879243 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
@@ -38,10 +38,13 @@ import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,9 +52,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Operator helps to generate globally unique instant, it must be executed in one parallelism. Before generate a new
@@ -71,16 +74,20 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
private String latestInstant = "";
private List<String> latestInstantList = new ArrayList<>(1);
private transient ListState<String> latestInstantState;
- private List<StreamRecord> bufferedRecords = new LinkedList();
- private transient ListState<StreamRecord> recordsState;
private Integer retryTimes;
private Integer retryInterval;
+ private static final String DELIMITER = "_";
+ private static final String INSTANT_MARKER_FOLDER_NAME = ".instant_marker";
+ private transient boolean isMain = false;
+ private transient AtomicLong recordCounter = new AtomicLong(0);
+ private StreamingRuntimeContext runtimeContext;
+ private int indexOfThisSubtask;
@Override
public void processElement(StreamRecord<HoodieRecord> streamRecord) throws Exception {
if (streamRecord.getValue() != null) {
- bufferedRecords.add(streamRecord);
output.collect(streamRecord);
+ recordCounter.incrementAndGet();
}
}
@@ -88,7 +95,7 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
public void open() throws Exception {
super.open();
// get configs from runtimeContext
- cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+ cfg = (HoodieFlinkStreamer.Config) runtimeContext.getExecutionConfig().getGlobalJobParameters();
// retry times
retryTimes = Integer.valueOf(cfg.blockRetryTime);
@@ -102,65 +109,78 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
// Hadoop FileSystem
fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
- TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null);
+ if (isMain) {
+ TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(runtimeContext);
- // writeClient
- writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
+ // writeClient
+ writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
- // init table, create it if not exists.
- initTable();
+ // init table, create it if not exists.
+ initTable();
+
+ // create instant marker directory
+ createInstantMarkerDir();
+ }
}
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
super.prepareSnapshotPreBarrier(checkpointId);
- // check whether the last instant is completed, if not, wait 10s and then throws an exception
- if (!StringUtils.isNullOrEmpty(latestInstant)) {
- doCheck();
- // last instant completed, set it empty
- latestInstant = "";
- }
-
- // no data no new instant
- if (!bufferedRecords.isEmpty()) {
- latestInstant = startNewInstant(checkpointId);
+ String instantMarkerFileName = String.format("%d%s%d%s%d", indexOfThisSubtask, DELIMITER, checkpointId, DELIMITER, recordCounter.get());
+ Path path = new Path(new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME), instantMarkerFileName);
+ // mk marker file by each subtask
+ fs.create(path, true);
+ LOG.info("Subtask [{}] at checkpoint [{}] created marker file [{}]", indexOfThisSubtask, checkpointId, instantMarkerFileName);
+ if (isMain) {
+ // check whether the last instant is completed, if not, wait 10s and then throws an exception
+ if (!StringUtils.isNullOrEmpty(latestInstant)) {
+ doCheck();
+ // last instant completed, set it empty
+ latestInstant = "";
+ }
+ boolean receivedDataInCurrentCP = checkReceivedData(checkpointId);
+ // no data no new instant
+ if (receivedDataInCurrentCP) {
+ latestInstant = startNewInstant(checkpointId);
+ }
}
}
@Override
public void initializeState(StateInitializationContext context) throws Exception {
- // instantState
- ListStateDescriptor<String> latestInstantStateDescriptor = new ListStateDescriptor<String>("latestInstant", String.class);
- latestInstantState = context.getOperatorStateStore().getListState(latestInstantStateDescriptor);
-
- // recordState
- ListStateDescriptor<StreamRecord> recordsStateDescriptor = new ListStateDescriptor<StreamRecord>("recordsState", StreamRecord.class);
- recordsState = context.getOperatorStateStore().getListState(recordsStateDescriptor);
-
- if (context.isRestored()) {
- Iterator<String> latestInstantIterator = latestInstantState.get().iterator();
- latestInstantIterator.forEachRemaining(x -> latestInstant = x);
- LOG.info("InstantGenerateOperator initializeState get latestInstant [{}]", latestInstant);
-
- Iterator<StreamRecord> recordIterator = recordsState.get().iterator();
- bufferedRecords.clear();
- recordIterator.forEachRemaining(x -> bufferedRecords.add(x));
+ runtimeContext = getRuntimeContext();
+ indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
+ isMain = indexOfThisSubtask == 0;
+
+ if (isMain) {
+ // instantState
+ ListStateDescriptor<String> latestInstantStateDescriptor = new ListStateDescriptor<>("latestInstant", String.class);
+ latestInstantState = context.getOperatorStateStore().getListState(latestInstantStateDescriptor);
+
+ if (context.isRestored()) {
+ Iterator<String> latestInstantIterator = latestInstantState.get().iterator();
+ latestInstantIterator.forEachRemaining(x -> latestInstant = x);
+ LOG.info("Restoring the latest instant [{}] from the state", latestInstant);
+ }
}
}
@Override
public void snapshotState(StateSnapshotContext functionSnapshotContext) throws Exception {
- if (latestInstantList.isEmpty()) {
- latestInstantList.add(latestInstant);
+ long checkpointId = functionSnapshotContext.getCheckpointId();
+ long recordSize = recordCounter.get();
+ if (isMain) {
+ LOG.info("Update latest instant [{}] records size [{}] checkpointId [{}]", latestInstant, recordSize, checkpointId);
+ if (latestInstantList.isEmpty()) {
+ latestInstantList.add(latestInstant);
+ } else {
+ latestInstantList.set(0, latestInstant);
+ }
+ latestInstantState.update(latestInstantList);
} else {
- latestInstantList.set(0, latestInstant);
+ LOG.info("Task instance {} received {} records in checkpoint [{}]", indexOfThisSubtask, recordSize, checkpointId);
}
- latestInstantState.update(latestInstantList);
- LOG.info("Update latest instant [{}]", latestInstant);
-
- recordsState.update(bufferedRecords);
- LOG.info("Update records state size = [{}]", bufferedRecords.size());
- bufferedRecords.clear();
+ recordCounter.set(0);
}
/**
@@ -185,10 +205,10 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
int tryTimes = 0;
while (tryTimes < retryTimes) {
tryTimes++;
- StringBuffer sb = new StringBuffer();
+ StringBuilder sb = new StringBuilder();
if (rollbackPendingCommits.contains(latestInstant)) {
rollbackPendingCommits.forEach(x -> sb.append(x).append(","));
- LOG.warn("Latest transaction [{}] is not completed! unCompleted transaction:[{}],try times [{}]", latestInstant, sb.toString(), tryTimes);
+ LOG.warn("Latest transaction [{}] is not completed! unCompleted transaction:[{}],try times [{}]", latestInstant, sb, tryTimes);
TimeUnit.SECONDS.sleep(retryInterval);
rollbackPendingCommits = writeClient.getInflightsAndRequestedInstants(commitType);
} else {
@@ -222,4 +242,60 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
fs.close();
}
}
+
+ private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
+ int numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks();
+ FileStatus[] fileStatuses;
+ Path instantMarkerPath = new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME);
+ // waiting all subtask create marker file ready
+ while (true) {
+ Thread.sleep(500L);
+ fileStatuses = fs.listStatus(instantMarkerPath, new PathFilter() {
+ @Override
+ public boolean accept(Path pathname) {
+ return pathname.getName().contains(String.format("%s%d%s", DELIMITER, checkpointId, DELIMITER));
+ }
+ });
+
+ // is ready
+ if (fileStatuses != null && fileStatuses.length == numberOfParallelSubtasks) {
+ break;
+ }
+ }
+
+ boolean receivedData = false;
+ // judge whether has data in this checkpoint and delete maker file.
+ for (FileStatus fileStatus : fileStatuses) {
+ Path path = fileStatus.getPath();
+ String name = path.getName();
+ // has data
+ if (Long.parseLong(name.split(DELIMITER)[2]) > 0) {
+ receivedData = true;
+ break;
+ }
+ }
+
+ // delete all marker file
+ cleanMarkerDir(instantMarkerPath);
+
+ return receivedData;
+ }
+
+ private void createInstantMarkerDir() throws IOException {
+ // Always create instantMarkerFolder which is needed for InstantGenerateOperator
+ final Path instantMarkerFolder = new Path(new Path(cfg.targetBasePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME), INSTANT_MARKER_FOLDER_NAME);
+ if (!fs.exists(instantMarkerFolder)) {
+ fs.mkdirs(instantMarkerFolder);
+ } else {
+ // Clean marker dir.
+ cleanMarkerDir(instantMarkerFolder);
+ }
+ }
+
+ private void cleanMarkerDir(Path instantMarkerFolder) throws IOException {
+ FileStatus[] fileStatuses = fs.listStatus(instantMarkerFolder);
+ for (FileStatus fileStatus : fileStatuses) {
+ fs.delete(fileStatus.getPath(), true);
+ }
+ }
}