You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2023/04/10 01:33:26 UTC
[doris-flink-connector] branch master updated: improve log print (#127)
This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new b2a6435 improve log print (#127)
b2a6435 is described below
commit b2a64352a2c3e8d6a6f871793f9a0f6f4aeba029
Author: wudi <67...@qq.com>
AuthorDate: Mon Apr 10 09:33:21 2023 +0800
improve log print (#127)
Co-authored-by: wudi <>
---
.../org/apache/doris/flink/sink/committer/DorisCommitter.java | 1 +
.../org/apache/doris/flink/sink/writer/DorisStreamLoad.java | 4 ++--
.../java/org/apache/doris/flink/sink/writer/DorisWriter.java | 10 ++++++----
.../apache/doris/flink/sink/writer/TestDorisStreamLoad.java | 6 +++---
4 files changed, 12 insertions(+), 9 deletions(-)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
index acbd310..3b61d82 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
@@ -88,6 +88,7 @@ public class DorisCommitter implements Committer<DorisCommittable> {
//hostPort
String hostPort = committable.getHostPort();
+ LOG.info("commit txn {} to host {}", committable.getTxnID(), hostPort);
int retry = 0;
while (retry++ <= maxRetry) {
//get latest-url
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index dc33fce..e7ade2f 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -210,9 +210,9 @@ public class DorisStreamLoad implements Serializable {
throw new StreamLoadException("stream load error: " + response.getStatusLine().toString());
}
- public RespContent stopLoad() throws IOException{
+ public RespContent stopLoad(String label) throws IOException{
recordStream.endInput();
- LOG.info("stream load stopped.");
+ LOG.info("stream load stopped for {} on host {}", label, hostPort);
Preconditions.checkState(pendingLoadFuture != null);
try {
return handlePreCommitResponse(pendingLoadFuture.get());
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 6f0bbed..ac8fdaf 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -73,9 +73,9 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr
private final transient ScheduledExecutorService scheduledExecutorService;
private transient Thread executorThread;
private transient volatile Exception loadException = null;
-
private List<BackendV2.BackendRowV2> backends;
private long pos;
+ private String currentLabel;
public DorisWriter(Sink.InitContext initContext,
List<DorisWriterState> state,
@@ -122,7 +122,8 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr
}
// get main work thread.
executorThread = Thread.currentThread();
- dorisStreamLoad.startLoad(labelGenerator.generateLabel(lastCheckpointId + 1));
+ this.currentLabel = labelGenerator.generateLabel(lastCheckpointId + 1);
+ dorisStreamLoad.startLoad(currentLabel);
// when uploading data in streaming mode, we need to regularly detect whether there are exceptions.
scheduledExecutorService.scheduleWithFixedDelay(this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS);
}
@@ -142,7 +143,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr
// disable exception checker before stop load.
loading = false;
Preconditions.checkState(dorisStreamLoad != null);
- RespContent respContent = dorisStreamLoad.stopLoad();
+ RespContent respContent = dorisStreamLoad.stopLoad(currentLabel);
if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL());
throw new DorisRuntimeException(errMsg);
@@ -160,7 +161,8 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr
Preconditions.checkState(dorisStreamLoad != null);
// dynamic refresh BE node
this.dorisStreamLoad.setHostPort(getAvailableBackend());
- this.dorisStreamLoad.startLoad(labelGenerator.generateLabel(checkpointId + 1));
+ this.currentLabel = labelGenerator.generateLabel(checkpointId + 1);
+ this.dorisStreamLoad.startLoad(currentLabel);
this.loading = true;
return Collections.singletonList(dorisWriterState);
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
index 6a182bd..d6f0967 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
@@ -92,7 +92,7 @@ public class TestDorisStreamLoad {
DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
dorisStreamLoad.startLoad("1");
dorisStreamLoad.writeRecord(writeBuffer);
- dorisStreamLoad.stopLoad();
+ dorisStreamLoad.stopLoad("label");
byte[] buff = new byte[4];
int n = dorisStreamLoad.getRecordStream().read(buff);
dorisStreamLoad.getRecordStream().read(new byte[4]);
@@ -112,7 +112,7 @@ public class TestDorisStreamLoad {
dorisStreamLoad.startLoad("1");
dorisStreamLoad.writeRecord(writeBuffer);
dorisStreamLoad.writeRecord(writeBuffer);
- dorisStreamLoad.stopLoad();
+ dorisStreamLoad.stopLoad("label");
byte[] buff = new byte[9];
int n = dorisStreamLoad.getRecordStream().read(buff);
int ret = dorisStreamLoad.getRecordStream().read(new byte[9]);
@@ -137,7 +137,7 @@ public class TestDorisStreamLoad {
dorisStreamLoad.startLoad("1");
dorisStreamLoad.writeRecord("{\"id\": 1}".getBytes(StandardCharsets.UTF_8));
dorisStreamLoad.writeRecord("{\"id\": 2}".getBytes(StandardCharsets.UTF_8));
- dorisStreamLoad.stopLoad();
+ dorisStreamLoad.stopLoad("label");
byte[] buff = new byte[expectBuffer.length];
int n = dorisStreamLoad.getRecordStream().read(buff);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org