You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by pa...@apache.org on 2023/01/04 02:16:25 UTC
[inlong] branch master updated: [INLONG-7126][Sort] Support multiple dirty sink archive helper (#7127)
This is an automated email from the ASF dual-hosted git repository.
pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c40021560 [INLONG-7126][Sort] Support multiple dirty sink archive helper (#7127)
c40021560 is described below
commit c400215603434e4fd4b6dff6104fda7b86c83a3b
Author: Yizhou Yang <32...@users.noreply.github.com>
AuthorDate: Wed Jan 4 10:16:19 2023 +0800
[INLONG-7126][Sort] Support multiple dirty sink archive helper (#7127)
---
.../inlong/sort/base/dirty/DirtySinkHelper.java | 43 ++++++++++++++++++++++
.../internal/JdbcMultiBatchingOutputFormat.java | 3 +-
.../starrocks/manager/StarRocksSinkManager.java | 16 ++++----
.../table/sink/StarRocksDynamicSinkFunction.java | 2 +-
4 files changed, 55 insertions(+), 9 deletions(-)
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
index a962b974e..e815db856 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
@@ -18,8 +18,12 @@
package org.apache.inlong.sort.base.dirty;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,6 +97,45 @@ public class DirtySinkHelper<T> implements Serializable {
}
}
+ public void invokeMultiple(T dirtyData, DirtyType dirtyType, Throwable e,
+ String sinkMultipleFormat) {
+ JsonDynamicSchemaFormat jsonDynamicSchemaFormat =
+ (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+ if (!dirtyOptions.ignoreDirty()) {
+ RuntimeException ex;
+ if (e instanceof RuntimeException) {
+ ex = (RuntimeException) e;
+ } else {
+ ex = new RuntimeException(e);
+ }
+ throw ex;
+ }
+ if (dirtySink != null) {
+ JsonNode rootNode;
+ DirtyData.Builder<T> builder = DirtyData.builder();
+ try {
+ rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) dirtyData).getBinary(0));
+ } catch (Exception ex) {
+ invoke(dirtyData, DirtyType.DESERIALIZE_ERROR, e);
+ return;
+ }
+ try {
+ builder.setData(dirtyData)
+ .setDirtyType(dirtyType)
+ .setLabels(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getLabels()))
+ .setLogTag(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getLogTag()))
+ .setDirtyMessage(e.getMessage())
+ .setIdentifier(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getIdentifier()));
+ dirtySink.invoke(builder.build());
+ } catch (Exception ex) {
+ if (!dirtyOptions.ignoreSideOutputErrors()) {
+ throw new RuntimeException(ex);
+ }
+ LOGGER.warn("Dirty sink failed", ex);
+ }
+ }
+ }
+
public void setDirtyOptions(DirtyOptions dirtyOptions) {
this.dirtyOptions = dirtyOptions;
}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
index dd4be4d5c..67e9adac0 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
@@ -566,7 +566,8 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatc
outputMetrics(tableIdentifier, Long.valueOf(tableIdRecordList.size()),
1L, true);
if (!schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.THROW_WITH_STOP)) {
- dirtySinkHelper.invoke(record, DirtyType.RETRY_LOAD_ERROR, tableException);
+ dirtySinkHelper.invokeMultiple(record, DirtyType.RETRY_LOAD_ERROR, tableException,
+ sinkMultipleFormat);
}
tableExceptionMap.put(tableIdentifier, tableException);
if (stopWritingWhenTableException) {
diff --git a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
index d6a0f0f4e..19c762192 100644
--- a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
+++ b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
@@ -122,7 +122,8 @@ public class StarRocksSinkManager implements Serializable {
private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
private transient SinkTableMetricData metricData;
- private final DirtySinkHelper<Object> dirtySinkHelper;;
+ private final DirtySinkHelper<Object> dirtySinkHelper;
+ private String sinkMultipleFormat;
/**
* If a table writing throws exception, ignore it when receiving data later again
@@ -149,7 +150,6 @@ public class StarRocksSinkManager implements Serializable {
this.schemaUpdatePolicy = schemaUpdatePolicy;
this.dirtySinkHelper = dirtySinkHelper;
-
init(flinkSchema);
}
@@ -159,7 +159,8 @@ public class StarRocksSinkManager implements Serializable {
StarRocksQueryVisitor starrocksQueryVisitor,
boolean multipleSink,
SchemaUpdateExceptionPolicy schemaUpdatePolicy,
- DirtySinkHelper<Object> dirtySinkHelper) {
+ DirtySinkHelper<Object> dirtySinkHelper,
+ String multipleformat) {
this.sinkOptions = sinkOptions;
this.jdbcConnProvider = jdbcConnProvider;
this.starrocksQueryVisitor = starrocksQueryVisitor;
@@ -168,7 +169,7 @@ public class StarRocksSinkManager implements Serializable {
this.schemaUpdatePolicy = schemaUpdatePolicy;
this.dirtySinkHelper = dirtySinkHelper;
-
+ this.sinkMultipleFormat = multipleformat;
init(flinkSchema);
}
@@ -450,12 +451,13 @@ public class StarRocksSinkManager implements Serializable {
// archive dirty data
if (StarRocksSinkOptions.StreamLoadFormat.CSV.equals(sinkOptions.getStreamLoadFormat())) {
for (byte[] row : flushData.getBuffer()) {
- dirtySinkHelper.invoke(new String(row, StandardCharsets.UTF_8), DirtyType.BATCH_LOAD_ERROR, e);
+ dirtySinkHelper.invokeMultiple(new String(row, StandardCharsets.UTF_8), DirtyType.BATCH_LOAD_ERROR, e,
+ sinkMultipleFormat);
}
} else if (StarRocksSinkOptions.StreamLoadFormat.JSON.equals(sinkOptions.getStreamLoadFormat())) {
for (byte[] row : flushData.getBuffer()) {
- dirtySinkHelper.invoke(OBJECT_MAPPER.readTree(new String(row, StandardCharsets.UTF_8)),
- DirtyType.BATCH_LOAD_ERROR, e);
+ dirtySinkHelper.invokeMultiple(OBJECT_MAPPER.readTree(new String(row, StandardCharsets.UTF_8)),
+ DirtyType.BATCH_LOAD_ERROR, e, sinkMultipleFormat);
}
}
diff --git a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
index 16b985595..089c86e84 100644
--- a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
+++ b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
@@ -124,7 +124,7 @@ public class StarRocksDynamicSinkFunction<T> extends RichSinkFunction<T> impleme
StarRocksQueryVisitor starrocksQueryVisitor = new StarRocksQueryVisitor(jdbcConnProvider,
sinkOptions.getDatabaseName(), sinkOptions.getTableName());
this.sinkManager = new StarRocksSinkManager(sinkOptions, schema, jdbcConnProvider, starrocksQueryVisitor,
- multipleSink, schemaUpdatePolicy, dirtySinkHelper);
+ multipleSink, schemaUpdatePolicy, dirtySinkHelper, sinkMultipleFormat);
rowTransformer.setStarRocksColumns(starrocksQueryVisitor.getFieldMapping());
rowTransformer.setTableSchema(schema);