You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zh...@apache.org on 2023/05/30 11:02:44 UTC

[seatunnel] branch dev updated: [Improve] Add a jobId to the doris label to distinguish between tasks (#4839)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6672e9407 [Improve] Add a jobId to the doris label to distinguish between tasks (#4839)
6672e9407 is described below

commit 6672e940773224d0b4c6bc40a789126cc8c69dbd
Author: Carl-Zhou-CN <67...@users.noreply.github.com>
AuthorDate: Tue May 30 19:02:38 2023 +0800

    [Improve] Add a jobId to the doris label to distinguish between tasks (#4839)
    
    Co-authored-by: zhouyao <ya...@marketingforce.com>
---
 .../org/apache/seatunnel/connectors/doris/sink/DorisSink.java | 11 +++++++++--
 .../connectors/doris/sink/writer/DorisSinkWriter.java         |  6 ++++--
 2 files changed, 13 insertions(+), 4 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
index 018eb44bd..2c6d6ae74 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.doris.sink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.serialization.Serializer;
@@ -54,6 +55,7 @@ public class DorisSink
 
     private Config pluginConfig;
     private SeaTunnelRowType seaTunnelRowType;
+    private String jobId;
 
     @Override
     public String getPluginName() {
@@ -78,6 +80,11 @@ public class DorisSink
         }
     }
 
+    @Override
+    public void setJobContext(JobContext jobContext) {
+        this.jobId = jobContext.getJobId();
+    }
+
     @Override
     public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
         this.seaTunnelRowType = seaTunnelRowType;
@@ -93,7 +100,7 @@ public class DorisSink
             SinkWriter.Context context) throws IOException {
         DorisSinkWriter dorisSinkWriter =
                 new DorisSinkWriter(
-                        context, Collections.emptyList(), seaTunnelRowType, pluginConfig);
+                        context, Collections.emptyList(), seaTunnelRowType, pluginConfig, jobId);
         dorisSinkWriter.initializeLoad(Collections.emptyList());
         return dorisSinkWriter;
     }
@@ -102,7 +109,7 @@ public class DorisSink
     public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState> restoreWriter(
             SinkWriter.Context context, List<DorisSinkState> states) throws IOException {
         DorisSinkWriter dorisWriter =
-                new DorisSinkWriter(context, states, seaTunnelRowType, pluginConfig);
+                new DorisSinkWriter(context, states, seaTunnelRowType, pluginConfig, jobId);
         dorisWriter.initializeLoad(states);
         return dorisWriter;
     }
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
index 744db83e6..ac0927f08 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
@@ -78,13 +78,15 @@ public class DorisSinkWriter implements SinkWriter<SeaTunnelRow, DorisCommitInfo
             SinkWriter.Context context,
             List<DorisSinkState> state,
             SeaTunnelRowType seaTunnelRowType,
-            Config pluginConfig) {
+            Config pluginConfig,
+            String jobId) {
         this.dorisConfig = DorisConfig.loadConfig(pluginConfig);
         this.lastCheckpointId = state.size() != 0 ? state.get(0).getCheckpointId() : 0;
         log.info("restore checkpointId {}", lastCheckpointId);
         log.info("labelPrefix " + dorisConfig.getLabelPrefix());
         this.dorisSinkState = new DorisSinkState(dorisConfig.getLabelPrefix(), lastCheckpointId);
-        this.labelPrefix = dorisConfig.getLabelPrefix() + "_" + context.getIndexOfSubtask();
+        this.labelPrefix =
+                dorisConfig.getLabelPrefix() + "_" + jobId + "_" + context.getIndexOfSubtask();
         this.labelGenerator = new LabelGenerator(labelPrefix, dorisConfig.getEnable2PC());
         this.scheduledExecutorService =
                 new ScheduledThreadPoolExecutor(