You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zy...@apache.org on 2023/11/06 08:56:10 UTC
(doris-flink-connector) branch master updated: [improve] add multi table sink to DorisSink (#224)
This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 0b2a14a [improve] add multi table sink to DorisSink (#224)
0b2a14a is described below
commit 0b2a14a93d3b65115c25fe59d84d6b3375da517d
Author: wudi <67...@qq.com>
AuthorDate: Mon Nov 6 16:56:04 2023 +0800
[improve] add multi table sink to DorisSink (#224)
DorisSink supports multi-table import.
Example:
```java
Configuration config = new Configuration();
// config.setString("execution.savepoint.path","/tmp/checkpoint/chk-6");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
env.setParallelism(1);
env.getCheckpointConfig().setCheckpointStorage("file:///tmp/checkpoint/");
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(10000)));
env.enableCheckpointing(10000);
DorisSink.Builder<RecordWithMeta> builder = DorisSink.builder();
final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
Properties properties = new Properties();
properties.setProperty("column_separator", ",");
properties.setProperty("line_delimiter", "\n");
properties.setProperty("format", "csv");
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("127.0.0.1:8030")
.setTableIdentifier("")
.setUsername("root")
.setPassword("");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("xxx12")
.setStreamLoadProp(properties)
.setDeletable(false).enable2PC();
builder.setDorisReadOptions(readOptionBuilder.build())
.setDorisExecutionOptions(executionBuilder.build())
.setDorisOptions(dorisBuilder.build());
RecordWithMeta record = new RecordWithMeta("test", "test_flink_tmp1", "wangwu,1");
RecordWithMeta record1 = new RecordWithMeta("test", "test_flink_tmp", "wangwu,1");
DataStreamSource<RecordWithMeta> stringDataStreamSource = env.fromCollection(
Arrays.asList(record, record1));
stringDataStreamSource.sinkTo(builder.build());
```
For details, please refer to `org.apache.doris.flink.DorisSinkStreamMultiTableExample.java`
---
.../org/apache/doris/flink/cfg/DorisOptions.java | 4 +
.../org/apache/doris/flink/sink/BackendUtil.java | 12 ++
.../org/apache/doris/flink/sink/DorisSink.java | 1 -
.../doris/flink/sink/writer/DorisStreamLoad.java | 12 +-
.../doris/flink/sink/writer/DorisWriter.java | 211 +++++++++++++++------
.../doris/flink/sink/writer/DorisWriterState.java | 32 +++-
.../sink/writer/DorisWriterStateSerializer.java | 17 +-
.../doris/flink/sink/writer/LabelGenerator.java | 19 +-
.../flink/DorisSinkStreamMultiTableExample.java | 100 ++++++++++
.../org/apache/doris/flink/sink/HttpTestUtil.java | 41 ++++
.../flink/sink/writer/TestDorisStreamLoad.java | 12 +-
.../doris/flink/sink/writer/TestDorisWriter.java | 12 +-
.../writer/TestDorisWriterStateSerializer.java | 4 +-
13 files changed, 394 insertions(+), 83 deletions(-)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
index f560eae..6391e91 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
@@ -52,6 +52,10 @@ public class DorisOptions extends DorisConnectionOptions {
return tableIdentifier;
}
+ public void setTableIdentifier(String tableIdentifier) {
+ this.tableIdentifier = tableIdentifier;
+ }
+
public String save() throws IllegalArgumentException {
Properties copy = new Properties();
return IOUtils.propsToString(copy);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
index 0d45e2f..954bdd0 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
@@ -17,7 +17,11 @@
package org.apache.doris.flink.sink;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.rest.models.BackendV2;
import org.apache.doris.flink.rest.models.BackendV2.BackendRowV2;
@@ -62,6 +66,14 @@ public class BackendUtil {
return backends;
}
+ public static BackendUtil getInstance(DorisOptions dorisOptions, DorisReadOptions readOptions, Logger logger){
+ if(StringUtils.isNotEmpty(dorisOptions.getBenodes())){
+ return new BackendUtil(dorisOptions.getBenodes());
+ } else {
+ return new BackendUtil(RestService.getBackendsV2(dorisOptions, readOptions, logger));
+ }
+ }
+
public String getAvailableBackend() {
long tmp = pos + backends.size();
while (pos < tmp) {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
index bc2d45c..3adcd9e 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
@@ -140,7 +140,6 @@ public class DorisSink<IN>
public DorisSink<IN> build() {
Preconditions.checkNotNull(dorisOptions);
Preconditions.checkNotNull(dorisExecutionOptions);
- Preconditions.checkNotNull(serializer);
if(dorisReadOptions == null) {
dorisReadOptions = DorisReadOptions.builder().build();
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index 4fd9abf..fb904ef 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -114,6 +114,10 @@ public class DorisStreamLoad implements Serializable {
return db;
}
+ public String getTable() {
+ return table;
+ }
+
public String getHostPort() {
return hostPort;
}
@@ -141,7 +145,7 @@ public class DorisStreamLoad implements Serializable {
try {
// TODO: According to label abort txn. Currently, it can only be aborted based on txnid,
// so we must first request a streamload based on the label to get the txnid.
- String label = labelGenerator.generateLabel(startChkID);
+ String label = labelGenerator.generateTableLabel(startChkID);
HttpPutBuilder builder = new HttpPutBuilder();
builder.setUrl(loadUrlStr)
.baseAuth(user, passwd)
@@ -215,7 +219,7 @@ public class DorisStreamLoad implements Serializable {
public RespContent stopLoad(String label) throws IOException{
recordStream.endInput();
- LOG.info("stream load stopped for {} on host {}", label, hostPort);
+ LOG.info("table {} stream load stopped for {} on host {}", table, label, hostPort);
Preconditions.checkState(pendingLoadFuture != null);
try {
return handlePreCommitResponse(pendingLoadFuture.get());
@@ -233,7 +237,7 @@ public class DorisStreamLoad implements Serializable {
loadBatchFirstRecord = !isResume;
HttpPutBuilder putBuilder = new HttpPutBuilder();
recordStream.startInput(isResume);
- LOG.info("stream load started for {} on host {}", label, hostPort);
+ LOG.info("table {} stream load started for {} on host {}", table, label, hostPort);
try {
InputStreamEntity entity = new InputStreamEntity(recordStream);
putBuilder.setUrl(loadUrlStr)
@@ -247,7 +251,7 @@ public class DorisStreamLoad implements Serializable {
putBuilder.enable2PC();
}
pendingLoadFuture = executorService.submit(() -> {
- LOG.info("start execute load");
+ LOG.info("table {} start execute load", table);
return httpClient.execute(putBuilder.build());
});
} catch (Exception e) {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index b61d174..8550a21 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -23,28 +23,31 @@ import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.exception.StreamLoadException;
-import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.rest.models.RespContent;
import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpUtil;
+import org.apache.doris.flink.sink.batch.RecordWithMeta;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
-import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -62,21 +65,21 @@ public class DorisWriter<IN> implements StatefulSink.StatefulSinkWriter<IN, Dori
private static final List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
private final long lastCheckpointId;
private long curCheckpointId;
- private DorisStreamLoad dorisStreamLoad;
- volatile boolean loading;
+ private Map<String, DorisStreamLoad> dorisStreamLoadMap = new ConcurrentHashMap<>();
+ private Map<String, LabelGenerator> labelGeneratorMap = new ConcurrentHashMap<>();;
+ volatile boolean globalLoading;
+ private Map<String, Boolean> loadingMap = new ConcurrentHashMap<>();
private final DorisOptions dorisOptions;
private final DorisReadOptions dorisReadOptions;
private final DorisExecutionOptions executionOptions;
private final String labelPrefix;
- private final LabelGenerator labelGenerator;
+ private final int subtaskId;
private final int intervalTime;
- private final DorisWriterState dorisWriterState;
private final DorisRecordSerializer<IN> serializer;
private final transient ScheduledExecutorService scheduledExecutorService;
private transient Thread executorThread;
private transient volatile Exception loadException = null;
private BackendUtil backendUtil;
- private String currentLabel;
public DorisWriter(Sink.InitContext initContext,
Collection<DorisWriterState> state,
@@ -91,106 +94,188 @@ public class DorisWriter<IN> implements StatefulSink.StatefulSinkWriter<IN, Dori
this.curCheckpointId = lastCheckpointId + 1;
LOG.info("restore checkpointId {}", lastCheckpointId);
LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
- this.dorisWriterState = new DorisWriterState(executionOptions.getLabelPrefix());
- this.labelPrefix = executionOptions.getLabelPrefix() + "_" + initContext.getSubtaskId();
- this.labelGenerator = new LabelGenerator(labelPrefix, executionOptions.enabled2PC());
+ this.labelPrefix = executionOptions.getLabelPrefix();
+ this.subtaskId = initContext.getSubtaskId();
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("stream-load-check"));
this.serializer = serializer;
this.dorisOptions = dorisOptions;
this.dorisReadOptions = dorisReadOptions;
this.executionOptions = executionOptions;
this.intervalTime = executionOptions.checkInterval();
- this.loading = false;
+ this.globalLoading = false;
initializeLoad(state);
}
public void initializeLoad(Collection<DorisWriterState> state) {
- this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? new BackendUtil(
- dorisOptions.getBenodes())
- : new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
+ this.backendUtil = BackendUtil.getInstance(dorisOptions, dorisReadOptions, LOG);
try {
- this.dorisStreamLoad = new DorisStreamLoad(
- backendUtil.getAvailableBackend(),
- dorisOptions,
- executionOptions,
- labelGenerator, new HttpUtil().getHttpClient());
- // TODO: we need check and abort all pending transaction.
- // Discard transactions that may cause the job to fail.
if(executionOptions.enabled2PC()) {
- dorisStreamLoad.abortPreCommit(labelPrefix, curCheckpointId);
+ abortLingeringTransactions(state);
}
} catch (Exception e) {
+ LOG.error("Failed to abort transaction.", e);
throw new DorisRuntimeException(e);
}
// get main work thread.
executorThread = Thread.currentThread();
- this.currentLabel = labelGenerator.generateLabel(curCheckpointId);
// when uploading data in streaming mode, we need to regularly detect whether there are exceptions.
scheduledExecutorService.scheduleWithFixedDelay(this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS);
}
+ private void abortLingeringTransactions(Collection<DorisWriterState> recoveredStates) throws Exception {
+ List<String> alreadyAborts = new ArrayList<>();
+ //abort label in state
+ for(DorisWriterState state : recoveredStates){
+ // Todo: When the sink parallelism is reduced,
+ // the txn of the redundant task before aborting is also needed.
+ if(!state.getLabelPrefix().equals(labelPrefix)){
+ LOG.warn("Label prefix from previous execution {} has changed to {}.", state.getLabelPrefix(), executionOptions.getLabelPrefix());
+ }
+ String key = state.getDatabase() + "." + state.getTable();
+ DorisStreamLoad streamLoader = getStreamLoader(key);
+ streamLoader.abortPreCommit(state.getLabelPrefix(), curCheckpointId);
+ alreadyAborts.add(state.getLabelPrefix());
+ }
+
+ // TODO: In a multi-table scenario, if do not restore from checkpoint,
+ // when modify labelPrefix at startup, we cannot abort the previous label.
+ if(!alreadyAborts.contains(labelPrefix)
+ && StringUtils.isNotEmpty(dorisOptions.getTableIdentifier())
+ && StringUtils.isNotEmpty(labelPrefix)){
+ //abort current labelPrefix
+ DorisStreamLoad streamLoader = getStreamLoader(dorisOptions.getTableIdentifier());
+ streamLoader.abortPreCommit(labelPrefix, curCheckpointId);
+ }
+ }
+
@Override
public void write(IN in, Context context) throws IOException {
checkLoadException();
- byte[] serialize = serializer.serialize(in);
- if(Objects.isNull(serialize)){
- //ddl record
+ Tuple2<String, byte[]> rowTuple = serializeRecord(in);
+ String tableKey = rowTuple.f0;
+ byte[] serializeRow = rowTuple.f1;
+ if(serializeRow == null){
return;
}
- if(!loading) {
+
+ DorisStreamLoad streamLoader = getStreamLoader(tableKey);
+ if(!loadingMap.containsKey(tableKey)) {
// start stream load only when there has data
- dorisStreamLoad.startLoad(currentLabel, false);
- loading = true;
+ LabelGenerator labelGenerator = getLabelGenerator(tableKey);
+ String currentLabel = labelGenerator.generateTableLabel(curCheckpointId);
+ streamLoader.startLoad(currentLabel, false);
+ loadingMap.put(tableKey, true);
+ globalLoading = true;
+ }
+ streamLoader.writeRecord(serializeRow);
+ }
+
+ private Tuple2<String, byte[]> serializeRecord(IN in) throws IOException {
+ String tableKey = dorisOptions.getTableIdentifier();
+ byte[] serializeRow = null;
+ if(serializer != null) {
+ serializeRow = serializer.serialize(in);
+ if(Objects.isNull(serializeRow)){
+ //ddl record by JsonDebeziumSchemaSerializer
+ return Tuple2.of(tableKey, null);
+ }
}
- dorisStreamLoad.writeRecord(serialize);
+ //multi table load
+ if(in instanceof RecordWithMeta){
+ RecordWithMeta row = (RecordWithMeta) in;
+ if(StringUtils.isBlank(row.getTable())
+ || StringUtils.isBlank(row.getDatabase())
+ || row.getRecord() == null){
+ LOG.warn("Record or meta format is incorrect, ignore record db:{}, table:{}, row:{}", row.getDatabase(), row.getTable(), row.getRecord());
+ return Tuple2.of(tableKey, null);
+ }
+ tableKey = row.getDatabase() + "." + row.getTable();
+ serializeRow = row.getRecord().getBytes(StandardCharsets.UTF_8);
+ }
+ return Tuple2.of(tableKey, serializeRow);
}
@Override
public void flush(boolean flush) throws IOException, InterruptedException {
-
+ //No action is triggered, everything is in the precommit method
}
-
@Override
public Collection<DorisCommittable> prepareCommit() throws IOException, InterruptedException {
- if(!loading){
- //There is no data during the entire checkpoint period
+ // Verify whether data is written during a checkpoint
+ if(!globalLoading && loadingMap.values().stream().noneMatch(Boolean::booleanValue)){
return Collections.emptyList();
}
// disable exception checker before stop load.
- loading = false;
- Preconditions.checkState(dorisStreamLoad != null);
- RespContent respContent = dorisStreamLoad.stopLoad(currentLabel);
- if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
- String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL());
- throw new DorisRuntimeException(errMsg);
- }
- if (!executionOptions.enabled2PC()) {
- return Collections.emptyList();
+ globalLoading = false;
+ // clean loadingMap
+ loadingMap.clear();
+
+ // submit stream load http request
+ List<DorisCommittable> committableList = new ArrayList<>();
+ for(Map.Entry<String, DorisStreamLoad> streamLoader : dorisStreamLoadMap.entrySet()){
+ String tableIdentifier = streamLoader.getKey();
+ DorisStreamLoad dorisStreamLoad = streamLoader.getValue();
+ LabelGenerator labelGenerator = getLabelGenerator(tableIdentifier);
+ String currentLabel = labelGenerator.generateTableLabel(curCheckpointId);
+ RespContent respContent = dorisStreamLoad.stopLoad(currentLabel);
+ if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
+ String errMsg = String.format("tabel {} stream load error: %s, see more in %s", tableIdentifier, respContent.getMessage(), respContent.getErrorURL());
+ throw new DorisRuntimeException(errMsg);
+ }
+ if(executionOptions.enabled2PC()){
+ long txnId = respContent.getTxnId();
+ committableList.add(new DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId));
+ }
}
- long txnId = respContent.getTxnId();
- return Collections.singletonList(new DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId));
+ return committableList;
}
@Override
public List<DorisWriterState> snapshotState(long checkpointId) throws IOException {
- Preconditions.checkState(dorisStreamLoad != null);
- // dynamic refresh BE node
- this.dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend());
+ List<DorisWriterState> writerStates = new ArrayList<>();
+ for(DorisStreamLoad dorisStreamLoad : dorisStreamLoadMap.values()){
+ //Dynamic refresh backend
+ dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend());
+ DorisWriterState writerState = new DorisWriterState(labelPrefix, dorisStreamLoad.getDb(), dorisStreamLoad.getTable(), subtaskId);
+ writerStates.add(writerState);
+ }
this.curCheckpointId = checkpointId + 1;
- this.currentLabel = labelGenerator.generateLabel(curCheckpointId);
- return Collections.singletonList(dorisWriterState);
+ return writerStates;
+ }
+
+ private LabelGenerator getLabelGenerator(String tableKey){
+ return labelGeneratorMap.computeIfAbsent(tableKey, v-> new LabelGenerator(labelPrefix, executionOptions.enabled2PC(), tableKey, subtaskId));
+ }
+
+ private DorisStreamLoad getStreamLoader(String tableKey){
+ LabelGenerator labelGenerator = getLabelGenerator(tableKey);
+ dorisOptions.setTableIdentifier(tableKey);
+ return dorisStreamLoadMap.computeIfAbsent(tableKey, v -> new DorisStreamLoad(backendUtil.getAvailableBackend(),
+ dorisOptions,
+ executionOptions,
+ labelGenerator,
+ new HttpUtil().getHttpClient()));
}
+ /**
+ * Check the streamload http request regularly
+ */
private void checkDone() {
+ for(Map.Entry<String, DorisStreamLoad> streamLoadMap : dorisStreamLoadMap.entrySet()){
+ checkAllDone(streamLoadMap.getKey(), streamLoadMap.getValue());
+ }
+ }
+
+ private void checkAllDone(String tableIdentifier, DorisStreamLoad dorisStreamLoad){
// the load future is done and checked in prepareCommit().
// this will check error while loading.
LOG.debug("start timer checker, interval {} ms", intervalTime);
if (dorisStreamLoad.getPendingLoadFuture() != null
&& dorisStreamLoad.getPendingLoadFuture().isDone()) {
- if (!loading) {
- LOG.debug("not loading, skip timer checker");
+ if (!globalLoading || !loadingMap.get(tableIdentifier)) {
+ LOG.debug("not loading, skip timer checker for table {}", tableIdentifier);
return;
}
@@ -202,13 +287,14 @@ public class DorisWriter<IN> implements StatefulSink.StatefulSinkWriter<IN, Dori
// use send cached data to new txn, then notify to restart the stream
if (executionOptions.isUseCache()) {
try {
- this.dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend());
+ dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend());
if (executionOptions.enabled2PC()) {
dorisStreamLoad.abortPreCommit(labelPrefix, curCheckpointId);
}
// start a new txn(stream load)
- LOG.info("getting exception, breakpoint resume for checkpoint ID: {}", curCheckpointId);
- dorisStreamLoad.startLoad(labelGenerator.generateLabel(curCheckpointId), true);
+ LOG.info("getting exception, breakpoint resume for checkpoint ID: {}, table {}", curCheckpointId, tableIdentifier);
+ LabelGenerator labelGenerator = getLabelGenerator(tableIdentifier);
+ dorisStreamLoad.startLoad(labelGenerator.generateTableLabel(curCheckpointId), true);
} catch (Exception e) {
throw new DorisRuntimeException(e);
}
@@ -222,7 +308,7 @@ public class DorisWriter<IN> implements StatefulSink.StatefulSinkWriter<IN, Dori
}
loadException = new StreamLoadException(errorMsg);
- LOG.error("stream load finished unexpectedly, interrupt worker thread! {}", errorMsg);
+ LOG.error("table {} stream load finished unexpectedly, interrupt worker thread! {}", tableIdentifier, errorMsg);
// set the executor thread interrupted in case blocking in write data.
executorThread.interrupt();
}
@@ -238,21 +324,24 @@ public class DorisWriter<IN> implements StatefulSink.StatefulSinkWriter<IN, Dori
@VisibleForTesting
public boolean isLoading() {
- return this.loading;
+ return this.globalLoading;
}
@VisibleForTesting
- public void setDorisStreamLoad(DorisStreamLoad streamLoad) {
- this.dorisStreamLoad = streamLoad;
+ public void setDorisStreamLoadMap(Map<String, DorisStreamLoad> streamLoadMap) {
+ this.dorisStreamLoadMap = streamLoadMap;
}
@Override
public void close() throws Exception {
+ LOG.info("Close DorisWriter.");
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdownNow();
}
- if (dorisStreamLoad != null) {
- dorisStreamLoad.close();
+ if (dorisStreamLoadMap != null && !dorisStreamLoadMap.isEmpty()) {
+ for(DorisStreamLoad dorisStreamLoad : dorisStreamLoadMap.values()){
+ dorisStreamLoad.close();
+ }
}
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterState.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterState.java
index 1ec9727..a08e58d 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterState.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterState.java
@@ -24,14 +24,36 @@ import java.util.Objects;
*/
public class DorisWriterState {
String labelPrefix;
+ String database;
+ String table;
+ int subtaskId;
public DorisWriterState(String labelPrefix) {
this.labelPrefix = labelPrefix;
}
+ public DorisWriterState(String labelPrefix, String database, String table, int subtaskId) {
+ this.labelPrefix = labelPrefix;
+ this.database = database;
+ this.table = table;
+ this.subtaskId = subtaskId;
+ }
+
public String getLabelPrefix() {
return labelPrefix;
}
+ public String getDatabase() {
+ return database;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public int getSubtaskId() {
+ return subtaskId;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -41,18 +63,24 @@ public class DorisWriterState {
return false;
}
DorisWriterState that = (DorisWriterState) o;
- return Objects.equals(labelPrefix, that.labelPrefix);
+ return Objects.equals(labelPrefix, that.labelPrefix)
+ && Objects.equals(database, that.database)
+ && Objects.equals(table, that.table)
+ && Objects.equals(subtaskId, that.subtaskId);
}
@Override
public int hashCode() {
- return Objects.hash(labelPrefix);
+ return Objects.hash(labelPrefix, database, table, subtaskId);
}
@Override
public String toString() {
return "DorisWriterState{" +
"labelPrefix='" + labelPrefix + '\'' +
+ ", database='" + database + '\'' +
+ ", table='" + table + '\'' +
+ ", subtaskId=" + subtaskId +
'}';
}
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterStateSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterStateSerializer.java
index aed97b2..97f5cc1 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterStateSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterStateSerializer.java
@@ -30,9 +30,10 @@ import java.io.IOException;
*/
public class DorisWriterStateSerializer implements SimpleVersionedSerializer<DorisWriterState> {
+ private static final int VERSION = 2;
@Override
public int getVersion() {
- return 1;
+ return VERSION;
}
@Override
@@ -40,6 +41,9 @@ public class DorisWriterStateSerializer implements SimpleVersionedSerializer<Dor
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos)) {
out.writeUTF(dorisWriterState.getLabelPrefix());
+ out.writeUTF(dorisWriterState.getDatabase());
+ out.writeUTF(dorisWriterState.getTable());
+ out.writeInt(dorisWriterState.getSubtaskId());
out.flush();
return baos.toByteArray();
}
@@ -49,8 +53,15 @@ public class DorisWriterStateSerializer implements SimpleVersionedSerializer<Dor
public DorisWriterState deserialize(int version, byte[] serialized) throws IOException {
try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
final DataInputStream in = new DataInputStream(bais)) {
- final String labelPrefix = in.readUTF();
- return new DorisWriterState(labelPrefix);
+ String labelPrefix = in.readUTF();
+ if(version == 1){
+ return new DorisWriterState(labelPrefix);
+ }else {
+ final String database = in.readUTF();
+ final String table = in.readUTF();
+ final int subtaskId = in.readInt();
+ return new DorisWriterState(labelPrefix, database, table, subtaskId);
+ }
}
}
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
index 55f7811..60f5595 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
@@ -16,6 +16,8 @@
// under the License.
package org.apache.doris.flink.sink.writer;
+import org.apache.flink.util.Preconditions;
+
import java.util.UUID;
/**
@@ -24,14 +26,29 @@ import java.util.UUID;
public class LabelGenerator {
private String labelPrefix;
private boolean enable2PC;
+ private String tableIdentifier;
+ private int subtaskId;
public LabelGenerator(String labelPrefix, boolean enable2PC) {
this.labelPrefix = labelPrefix;
this.enable2PC = enable2PC;
}
+ public LabelGenerator(String labelPrefix, boolean enable2PC, String tableIdentifier, int subtaskId) {
+ this(labelPrefix, enable2PC);
+ // The label of stream load can not contain `.`
+ this.tableIdentifier = tableIdentifier.replace(".", "_");
+ this.subtaskId = subtaskId;
+ }
+
public String generateLabel(long chkId) {
- String label = labelPrefix + "_" + chkId;
+ String label = String.format("%s_%s_%s", labelPrefix, subtaskId, chkId);
+ return enable2PC ? label : label + "_" + UUID.randomUUID();
+ }
+
+ public String generateTableLabel(long chkId) {
+ Preconditions.checkState(tableIdentifier != null);
+ String label = String.format("%s_%s_%s_%s", labelPrefix, tableIdentifier, subtaskId, chkId);
return enable2PC ? label : label + "_" + UUID.randomUUID();
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkStreamMultiTableExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkStreamMultiTableExample.java
new file mode 100644
index 0000000..a884ea2
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkStreamMultiTableExample.java
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink;
+
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.DorisSink;
+import org.apache.doris.flink.sink.batch.RecordWithMeta;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+
+import java.util.Properties;
+import java.util.UUID;
+
+
+public class DorisSinkStreamMultiTableExample {
+ public static void main(String[] args) throws Exception {
+ Configuration config = new Configuration();
+// config.setString("execution.savepoint.path","/tmp/checkpoint/chk-6");
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.setParallelism(1);
+ env.getCheckpointConfig().setCheckpointStorage("file:///tmp/checkpoint/");
+ env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(10000)));
+ env.enableCheckpointing(10000);
+ DorisSink.Builder<RecordWithMeta> builder = DorisSink.builder();
+ final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
+ Properties properties = new Properties();
+ properties.setProperty("column_separator", ",");
+ properties.setProperty("line_delimiter", "\n");
+ properties.setProperty("format", "csv");
+ DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+ dorisBuilder.setFenodes("127.0.0.1:8030")
+ .setTableIdentifier("")
+ .setUsername("root")
+ .setPassword("");
+
+ DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
+ executionBuilder.setLabelPrefix("xxx12")
+ .setStreamLoadProp(properties)
+ .setDeletable(false).enable2PC();
+
+ builder.setDorisReadOptions(readOptionBuilder.build())
+ .setDorisExecutionOptions(executionBuilder.build())
+ .setDorisOptions(dorisBuilder.build());
+
+// RecordWithMeta record = new RecordWithMeta("test", "test_flink_tmp1", "wangwu,1");
+// RecordWithMeta record1 = new RecordWithMeta("test", "test_flink_tmp", "wangwu,1");
+// DataStreamSource<RecordWithMeta> stringDataStreamSource = env.fromCollection(
+// Arrays.asList(record, record1));
+// stringDataStreamSource.sinkTo(builder.build());
+
+ env.addSource(new ParallelSourceFunction<RecordWithMeta>() {
+ private Long id = 1000000L;
+ @Override
+ public void run(SourceContext<RecordWithMeta> out) throws Exception {
+ while (true) {
+ id = id + 1;
+ RecordWithMeta record = new RecordWithMeta("test", "test_flink_a", UUID.randomUUID() + ",1");
+ out.collect(record);
+ record = new RecordWithMeta("test", "test_flink_b", UUID.randomUUID() + ",2");
+ out.collect(record);
+ if(id > 100){
+ //mock dynamic add table
+ RecordWithMeta record3 = new RecordWithMeta("test", "test_flink_c", UUID.randomUUID() + ",1");
+ out.collect(record3);
+ }
+ Thread.sleep(3000);
+ }
+ }
+
+ @Override
+ public void cancel() {
+
+ }
+ }).sinkTo(builder.build());
+
+ env.execute("doris stream multi table test");
+ }
+
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/HttpTestUtil.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/HttpTestUtil.java
index 67e8ba3..99e9846 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/HttpTestUtil.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/HttpTestUtil.java
@@ -50,6 +50,27 @@ public class HttpTestUtil {
"}\n" +
"\n";
+ public static final String LABEL_EXIST_PRE_COMMIT_TABLE_RESPONSE = "{\n" +
+ "\"TxnId\": 1,\n" +
+ "\"Label\": \"test001_db_table_0_1\",\n" +
+ "\"TwoPhaseCommit\": \"true\",\n" +
+ "\"Status\": \"Label Already Exists\",\n" +
+ "\"ExistingJobStatus\": \"PRECOMMITTED\",\n" +
+ "\"Message\": \"errCode = 2, detailMessage = Label [test001_db_table_0_1] has already been used, relate to txn [1]\",\n" +
+ "\"NumberTotalRows\": 0,\n" +
+ "\"NumberLoadedRows\": 0,\n" +
+ "\"NumberFilteredRows\": 0,\n" +
+ "\"NumberUnselectedRows\": 0,\n" +
+ "\"LoadBytes\": 0,\n" +
+ "\"LoadTimeMs\": 0,\n" +
+ "\"BeginTxnTimeMs\": 0,\n" +
+ "\"StreamLoadPutTimeMs\": 0,\n" +
+ "\"ReadDataTimeMs\": 0,\n" +
+ "\"WriteDataTimeMs\": 0,\n" +
+ "\"CommitAndPublishTimeMs\": 0\n" +
+ "}\n" +
+ "\n";
+
public static final String PRE_COMMIT_RESPONSE = "{\n" +
"\"TxnId\": 2,\n" +
"\"Label\": \"test001_0_2\",\n" +
@@ -70,6 +91,26 @@ public class HttpTestUtil {
"}\n" +
"\n";
+ public static final String PRE_COMMIT_TABLE_RESPONSE = "{\n" +
+ "\"TxnId\": 2,\n" +
+ "\"Label\": \"test001_db_table_0_2\",\n" +
+ "\"TwoPhaseCommit\": \"true\",\n" +
+ "\"Status\": \"Success\",\n" +
+ "\"Message\": \"OK\",\n" +
+ "\"NumberTotalRows\": 0,\n" +
+ "\"NumberLoadedRows\": 0,\n" +
+ "\"NumberFilteredRows\": 0,\n" +
+ "\"NumberUnselectedRows\": 0,\n" +
+ "\"LoadBytes\": 0,\n" +
+ "\"LoadTimeMs\": 0,\n" +
+ "\"BeginTxnTimeMs\": 0,\n" +
+ "\"StreamLoadPutTimeMs\": 0,\n" +
+ "\"ReadDataTimeMs\": 0,\n" +
+ "\"WriteDataTimeMs\": 0,\n" +
+ "\"CommitAndPublishTimeMs\": 0\n" +
+ "}\n" +
+ "\n";
+
public static final String ABORT_SUCCESS_RESPONSE = "{\n" +
"\"status\": \"Success\",\n" +
"\"msg\": \"transaction [1] abort successfully.\"\n" +
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
index e295de1..8f292e2 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
@@ -56,13 +56,13 @@ public class TestDorisStreamLoad {
@Test
public void testAbortPreCommit() throws Exception {
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
- CloseableHttpResponse existLabelResponse = HttpTestUtil.getResponse(HttpTestUtil.LABEL_EXIST_PRE_COMMIT_RESPONSE, true);
- CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
+ CloseableHttpResponse existLabelResponse = HttpTestUtil.getResponse(HttpTestUtil.LABEL_EXIST_PRE_COMMIT_TABLE_RESPONSE, true);
+ CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_TABLE_RESPONSE, true);
when(httpClient.execute(any())).thenReturn(existLabelResponse, preCommitResponse);
- DorisStreamLoad dorisStreamLoad = spy(new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("test001_0", true), httpClient));
+ DorisStreamLoad dorisStreamLoad = spy(new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("test001", true, "db.table", 0), httpClient));
doNothing().when(dorisStreamLoad).abortTransaction(anyLong());
- dorisStreamLoad.abortPreCommit("test001_0", 1);
+ dorisStreamLoad.abortPreCommit("test001", 1);
}
@Test
@@ -70,7 +70,7 @@ public class TestDorisStreamLoad {
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
CloseableHttpResponse abortSuccessResponse = HttpTestUtil.getResponse(HttpTestUtil.ABORT_SUCCESS_RESPONSE, true);
when(httpClient.execute(any())).thenReturn(abortSuccessResponse);
- DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("test001_0", true), httpClient);
+ DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("test001", true), httpClient);
dorisStreamLoad.abortTransaction(anyLong());
}
@@ -79,7 +79,7 @@ public class TestDorisStreamLoad {
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
CloseableHttpResponse abortFailedResponse = HttpTestUtil.getResponse(HttpTestUtil.ABORT_FAILED_RESPONSE, true);
when(httpClient.execute(any())).thenReturn(abortFailedResponse);
- DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("test001_0", true), httpClient);
+ DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("test001", true), httpClient);
dorisStreamLoad.abortTransaction(anyLong());
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
index 01e1559..36f98c8 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
@@ -34,7 +34,9 @@ import org.junit.Test;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.OptionalLong;
+import java.util.concurrent.ConcurrentHashMap;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@@ -61,13 +63,14 @@ public class TestDorisWriter {
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
when(httpClient.execute(any())).thenReturn(preCommitResponse);
-
+ Map<String, DorisStreamLoad> dorisStreamLoadMap = new ConcurrentHashMap<>();
DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
+ dorisStreamLoadMap.put(dorisOptions.getTableIdentifier(), dorisStreamLoad);
dorisStreamLoad.startLoad("", false);
Sink.InitContext initContext = mock(Sink.InitContext.class);
when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1));
DorisWriter<String> dorisWriter = new DorisWriter<String>(initContext, Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, readOptions, executionOptions);
- dorisWriter.setDorisStreamLoad(dorisStreamLoad);
+ dorisWriter.setDorisStreamLoadMap(dorisStreamLoadMap);
dorisWriter.write("doris,1",null);
Collection<DorisCommittable> committableList = dorisWriter.prepareCommit();
Assert.assertEquals(1, committableList.size());
@@ -84,11 +87,14 @@ public class TestDorisWriter {
CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
when(httpClient.execute(any())).thenReturn(preCommitResponse);
+ Map<String, DorisStreamLoad> dorisStreamLoadMap = new ConcurrentHashMap<>();
DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
+ dorisStreamLoadMap.put(dorisOptions.getTableIdentifier(), dorisStreamLoad);
+
Sink.InitContext initContext = mock(Sink.InitContext.class);
when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1));
DorisWriter<String> dorisWriter = new DorisWriter<String>(initContext, Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, readOptions, executionOptions);
- dorisWriter.setDorisStreamLoad(dorisStreamLoad);
+ dorisWriter.setDorisStreamLoadMap(dorisStreamLoadMap);
List<DorisWriterState> writerStates = dorisWriter.snapshotState(1);
Assert.assertEquals(1, writerStates.size());
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriterStateSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriterStateSerializer.java
index 84695d7..fce2a26 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriterStateSerializer.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriterStateSerializer.java
@@ -26,9 +26,9 @@ import org.junit.Test;
public class TestDorisWriterStateSerializer {
@Test
public void testSerialize() throws Exception {
- DorisWriterState expectDorisWriterState = new DorisWriterState("doris");
+ DorisWriterState expectDorisWriterState = new DorisWriterState("doris", "db", "table", 0);
DorisWriterStateSerializer serializer = new DorisWriterStateSerializer();
- DorisWriterState dorisWriterState = serializer.deserialize(1, serializer.serialize(expectDorisWriterState));
+ DorisWriterState dorisWriterState = serializer.deserialize(2, serializer.serialize(expectDorisWriterState));
Assert.assertEquals(expectDorisWriterState, dorisWriterState);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org