You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by yu...@apache.org on 2022/10/31 09:20:03 UTC
[inlong] branch master updated: [INLONG-6322][Sort] Fix write data incorrect for doris connector with sink multiple scenario (#6323)
This is an automated email from the ASF dual-hosted git repository.
yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ca24d2ede [INLONG-6322][Sort] Fix write data incorrect for doris connector with sink multiple scenario (#6323)
ca24d2ede is described below
commit ca24d2edea046611f8c77d70b1465b550f146aaa
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Mon Oct 31 17:19:57 2022 +0800
[INLONG-6322][Sort] Fix write data incorrect for doris connector with sink multiple scenario (#6323)
* [INLONG-6322][Sort] Fix write data incorrect for doris connector with sink multiple scenario
* [INLONG-6322][Sort] Fix flushing update error
* [INLONG-6322][Sort] Optimize flush handle
* [INLONG-6322][Sort] Fix comment error
---
.../table/DorisDynamicSchemaOutputFormat.java | 68 +++++++++++++++++-----
1 file changed, 53 insertions(+), 15 deletions(-)
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index c2db85789..9e8142ae7 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -47,6 +47,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
/**
* DorisDynamicSchemaOutputFormat, copy from {@link org.apache.doris.flink.table.DorisDynamicOutputFormat}
@@ -76,11 +77,16 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
private final String tablePattern;
private final String dynamicSchemaFormat;
private final boolean ignoreSingleTableErrors;
- private final transient Map<String, Exception> flushExceptionMap = new HashMap<>();
+ private final Map<String, Exception> flushExceptionMap = new HashMap<>();
+ private final AtomicLong readInNum = new AtomicLong(0);
+ private final AtomicLong writeOutNum = new AtomicLong(0);
+ private final AtomicLong errorNum = new AtomicLong(0);
+ private final AtomicLong ddlNum = new AtomicLong(0);
private long batchBytes = 0L;
private int size;
private DorisStreamLoad dorisStreamLoad;
private transient volatile boolean closed = false;
+ private transient volatile boolean flushing = false;
private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
private transient JsonDynamicSchemaFormat jsonDynamicSchemaFormat;
@@ -131,10 +137,8 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
this.scheduler = new ScheduledThreadPoolExecutor(1,
new ExecutorThreadFactory("doris-streamload-output-format"));
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
- synchronized (DorisDynamicSchemaOutputFormat.this) {
- if (!closed) {
- flush();
- }
+ if (!closed && !flushing) {
+ flush();
}
}, executionOptions.getBatchIntervalMs(), executionOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
}
@@ -158,7 +162,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
addBatch(row);
boolean valid = (executionOptions.getBatchSize() > 0 && size >= executionOptions.getBatchSize())
|| batchBytes >= executionOptions.getMaxBatchBytes();
- if (valid) {
+ if (valid && !flushing) {
flush();
}
}
@@ -168,8 +172,10 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
if (row instanceof RowData) {
RowData rowData = (RowData) row;
JsonNode rootNode = jsonDynamicSchemaFormat.deserialize(rowData.getBinary(0));
+ readInNum.incrementAndGet();
boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode);
if (isDDL) {
+ ddlNum.incrementAndGet();
// Ignore ddl change for now
return;
}
@@ -226,6 +232,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
}
break;
default:
+ errorNum.incrementAndGet();
throw new RuntimeException("Unrecognized row kind:" + rowKind.toString());
}
}
@@ -239,12 +246,10 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
public synchronized void close() throws IOException {
if (!closed) {
closed = true;
-
if (this.scheduledFuture != null) {
scheduledFuture.cancel(false);
this.scheduler.shutdown();
}
-
try {
flush();
} catch (Exception e) {
@@ -258,27 +263,61 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
@SuppressWarnings({"rawtypes"})
public synchronized void flush() {
- if (batchMap.isEmpty()) {
+ flushing = true;
+ if (!hasRecords()) {
+ flushing = false;
return;
}
+ List<String> errorTables = new ArrayList<>();
for (Entry<String, List> kvs : batchMap.entrySet()) {
- if (checkFlushException(kvs.getKey())) {
+ if (checkFlushException(kvs.getKey()) || kvs.getValue().isEmpty()) {
continue;
}
+ String loadValue = null;
try {
- load(kvs.getKey(), OBJECT_MAPPER.writeValueAsString(kvs.getValue()));
+ loadValue = OBJECT_MAPPER.writeValueAsString(kvs.getValue());
+ load(kvs.getKey(), loadValue);
+ LOG.info("load {} records to tableIdentifier: {}", kvs.getValue().size(), kvs.getKey());
+ writeOutNum.addAndGet(kvs.getValue().size());
+ // Clean the data that has been loaded.
+ kvs.getValue().clear();
} catch (Exception e) {
flushExceptionMap.put(kvs.getKey(), e);
+ errorNum.getAndAdd(kvs.getValue().size());
if (!ignoreSingleTableErrors) {
throw new RuntimeException(
- String.format("Writing records to streamload of tableIdentifier:%s failed.", kvs.getKey()),
- e);
+ String.format("Writing records to streamload of tableIdentifier:%s failed, the value: %s.",
+ kvs.getKey(), loadValue), e);
}
- batchMap.remove(kvs.getKey());
+ errorTables.add(kvs.getKey());
+ LOG.warn("The tableIdentifier: {} load failed and the data will be throw away in the future"
+ + " because the option 'sink.multiple.ignore-single-table-errors' is 'true'", kvs.getKey());
}
}
+ if (!errorTables.isEmpty()) {
+ // Clean the key that has errors
+ errorTables.forEach(batchMap::remove);
+ }
batchBytes = 0;
size = 0;
+ LOG.info("Doris sink statistics: readInNum: {}, writeOutNum: {}, errorNum: {}, ddlNum: {}",
+ readInNum.get(), writeOutNum.get(), errorNum.get(), ddlNum.get());
+ flushing = false;
+ }
+
+ @SuppressWarnings("rawtypes")
+ private boolean hasRecords() {
+ if (batchMap.isEmpty()) {
+ return false;
+ }
+ boolean hasRecords = false;
+ for (List value : batchMap.values()) {
+ if (!value.isEmpty()) {
+ hasRecords = true;
+ break;
+ }
+ }
+ return hasRecords;
}
private void load(String tableIdentifier, String result) throws IOException {
@@ -286,7 +325,6 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
try {
dorisStreamLoad.load(tableWithDb[0], tableWithDb[1], result);
- batchMap.remove(tableIdentifier);
break;
} catch (StreamLoadException e) {
LOG.error("doris sink error, retry times = {}", i, e);