You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2023/04/10 01:33:26 UTC

[doris-flink-connector] branch master updated: improve log print (#127)

This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new b2a6435  improve log print (#127)
b2a6435 is described below

commit b2a64352a2c3e8d6a6f871793f9a0f6f4aeba029
Author: wudi <67...@qq.com>
AuthorDate: Mon Apr 10 09:33:21 2023 +0800

    improve log print (#127)
    
    Co-authored-by: wudi <>
---
 .../org/apache/doris/flink/sink/committer/DorisCommitter.java  |  1 +
 .../org/apache/doris/flink/sink/writer/DorisStreamLoad.java    |  4 ++--
 .../java/org/apache/doris/flink/sink/writer/DorisWriter.java   | 10 ++++++----
 .../apache/doris/flink/sink/writer/TestDorisStreamLoad.java    |  6 +++---
 4 files changed, 12 insertions(+), 9 deletions(-)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
index acbd310..3b61d82 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
@@ -88,6 +88,7 @@ public class DorisCommitter implements Committer<DorisCommittable> {
         //hostPort
         String hostPort = committable.getHostPort();
 
+        LOG.info("commit txn {} to host {}", committable.getTxnID(), hostPort);
         int retry = 0;
         while (retry++ <= maxRetry) {
             //get latest-url
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index dc33fce..e7ade2f 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -210,9 +210,9 @@ public class DorisStreamLoad implements Serializable {
         throw new StreamLoadException("stream load error: " + response.getStatusLine().toString());
     }
 
-    public RespContent stopLoad() throws IOException{
+    public RespContent stopLoad(String label) throws IOException{
         recordStream.endInput();
-        LOG.info("stream load stopped.");
+        LOG.info("stream load stopped for {} on host {}", label, hostPort);
         Preconditions.checkState(pendingLoadFuture != null);
         try {
            return handlePreCommitResponse(pendingLoadFuture.get());
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 6f0bbed..ac8fdaf 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -73,9 +73,9 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr
     private final transient ScheduledExecutorService scheduledExecutorService;
     private transient Thread executorThread;
     private transient volatile Exception loadException = null;
-
     private List<BackendV2.BackendRowV2> backends;
     private long pos;
+    private String currentLabel;
 
     public DorisWriter(Sink.InitContext initContext,
                        List<DorisWriterState> state,
@@ -122,7 +122,8 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr
         }
         // get main work thread.
         executorThread = Thread.currentThread();
-        dorisStreamLoad.startLoad(labelGenerator.generateLabel(lastCheckpointId + 1));
+        this.currentLabel = labelGenerator.generateLabel(lastCheckpointId + 1);
+        dorisStreamLoad.startLoad(currentLabel);
         // when uploading data in streaming mode, we need to regularly detect whether there are exceptions.
         scheduledExecutorService.scheduleWithFixedDelay(this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS);
     }
@@ -142,7 +143,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr
         // disable exception checker before stop load.
         loading = false;
         Preconditions.checkState(dorisStreamLoad != null);
-        RespContent respContent = dorisStreamLoad.stopLoad();
+        RespContent respContent = dorisStreamLoad.stopLoad(currentLabel);
         if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
             String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL());
             throw new DorisRuntimeException(errMsg);
@@ -160,7 +161,8 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr
         Preconditions.checkState(dorisStreamLoad != null);
         // dynamic refresh BE node
         this.dorisStreamLoad.setHostPort(getAvailableBackend());
-        this.dorisStreamLoad.startLoad(labelGenerator.generateLabel(checkpointId + 1));
+        this.currentLabel = labelGenerator.generateLabel(checkpointId + 1);
+        this.dorisStreamLoad.startLoad(currentLabel);
         this.loading = true;
         return Collections.singletonList(dorisWriterState);
     }
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
index 6a182bd..d6f0967 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
@@ -92,7 +92,7 @@ public class TestDorisStreamLoad {
         DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
         dorisStreamLoad.startLoad("1");
         dorisStreamLoad.writeRecord(writeBuffer);
-        dorisStreamLoad.stopLoad();
+        dorisStreamLoad.stopLoad("label");
         byte[] buff = new byte[4];
         int n = dorisStreamLoad.getRecordStream().read(buff);
         dorisStreamLoad.getRecordStream().read(new byte[4]);
@@ -112,7 +112,7 @@ public class TestDorisStreamLoad {
         dorisStreamLoad.startLoad("1");
         dorisStreamLoad.writeRecord(writeBuffer);
         dorisStreamLoad.writeRecord(writeBuffer);
-        dorisStreamLoad.stopLoad();
+        dorisStreamLoad.stopLoad("label");
         byte[] buff = new byte[9];
         int n = dorisStreamLoad.getRecordStream().read(buff);
         int ret = dorisStreamLoad.getRecordStream().read(new byte[9]);
@@ -137,7 +137,7 @@ public class TestDorisStreamLoad {
         dorisStreamLoad.startLoad("1");
         dorisStreamLoad.writeRecord("{\"id\": 1}".getBytes(StandardCharsets.UTF_8));
         dorisStreamLoad.writeRecord("{\"id\": 2}".getBytes(StandardCharsets.UTF_8));
-        dorisStreamLoad.stopLoad();
+        dorisStreamLoad.stopLoad("label");
         byte[] buff = new byte[expectBuffer.length];
         int n = dorisStreamLoad.getRecordStream().read(buff);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org