You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zh...@apache.org on 2023/05/30 11:02:44 UTC
[seatunnel] branch dev updated: [Improve] Add a jobId to the doris label to distinguish between tasks (#4839)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6672e9407 [Improve] Add a jobId to the doris label to distinguish between tasks (#4839)
6672e9407 is described below
commit 6672e940773224d0b4c6bc40a789126cc8c69dbd
Author: Carl-Zhou-CN <67...@users.noreply.github.com>
AuthorDate: Tue May 30 19:02:38 2023 +0800
[Improve] Add a jobId to the doris label to distinguish between tasks (#4839)
Co-authored-by: zhouyao <ya...@marketingforce.com>
---
.../org/apache/seatunnel/connectors/doris/sink/DorisSink.java | 11 +++++++++--
.../connectors/doris/sink/writer/DorisSinkWriter.java | 6 ++++--
2 files changed, 13 insertions(+), 4 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
index 018eb44bd..2c6d6ae74 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.doris.sink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.serialization.Serializer;
@@ -54,6 +55,7 @@ public class DorisSink
private Config pluginConfig;
private SeaTunnelRowType seaTunnelRowType;
+ private String jobId;
@Override
public String getPluginName() {
@@ -78,6 +80,11 @@ public class DorisSink
}
}
+ @Override
+ public void setJobContext(JobContext jobContext) {
+ this.jobId = jobContext.getJobId();
+ }
+
@Override
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
@@ -93,7 +100,7 @@ public class DorisSink
SinkWriter.Context context) throws IOException {
DorisSinkWriter dorisSinkWriter =
new DorisSinkWriter(
- context, Collections.emptyList(), seaTunnelRowType, pluginConfig);
+ context, Collections.emptyList(), seaTunnelRowType, pluginConfig, jobId);
dorisSinkWriter.initializeLoad(Collections.emptyList());
return dorisSinkWriter;
}
@@ -102,7 +109,7 @@ public class DorisSink
public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState> restoreWriter(
SinkWriter.Context context, List<DorisSinkState> states) throws IOException {
DorisSinkWriter dorisWriter =
- new DorisSinkWriter(context, states, seaTunnelRowType, pluginConfig);
+ new DorisSinkWriter(context, states, seaTunnelRowType, pluginConfig, jobId);
dorisWriter.initializeLoad(states);
return dorisWriter;
}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
index 744db83e6..ac0927f08 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
@@ -78,13 +78,15 @@ public class DorisSinkWriter implements SinkWriter<SeaTunnelRow, DorisCommitInfo
SinkWriter.Context context,
List<DorisSinkState> state,
SeaTunnelRowType seaTunnelRowType,
- Config pluginConfig) {
+ Config pluginConfig,
+ String jobId) {
this.dorisConfig = DorisConfig.loadConfig(pluginConfig);
this.lastCheckpointId = state.size() != 0 ? state.get(0).getCheckpointId() : 0;
log.info("restore checkpointId {}", lastCheckpointId);
log.info("labelPrefix " + dorisConfig.getLabelPrefix());
this.dorisSinkState = new DorisSinkState(dorisConfig.getLabelPrefix(), lastCheckpointId);
- this.labelPrefix = dorisConfig.getLabelPrefix() + "_" + context.getIndexOfSubtask();
+ this.labelPrefix =
+ dorisConfig.getLabelPrefix() + "_" + jobId + "_" + context.getIndexOfSubtask();
this.labelGenerator = new LabelGenerator(labelPrefix, dorisConfig.getEnable2PC());
this.scheduledExecutorService =
new ScheduledThreadPoolExecutor(