You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/11/24 03:37:10 UTC
[inlong] branch master updated: [INLONG-6495][Sort] Support metrics and restore metrics for single-table of Doris (#6541)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 09f085d61 [INLONG-6495][Sort] Support metrics and restore metrics for single-table of Doris (#6541)
09f085d61 is described below
commit 09f085d61fe6525b89240a2f9c45b3dfe139e68f
Author: Yizhou Yang <32...@users.noreply.github.com>
AuthorDate: Thu Nov 24 11:37:05 2022 +0800
[INLONG-6495][Sort] Support metrics and restore metrics for single-table of Doris (#6541)
* Support metrics and restore metrics for single-table of Doris
* Change some codes format and fix the conflict
Co-authored-by: healchow <he...@gmail.com>
---
.../table/DorisDynamicSchemaOutputFormat.java | 229 ++++++++++++++++++---
.../sort/doris/table/DorisDynamicTableSink.java | 54 +----
.../inlong/sort/doris/util/DorisParseUtils.java | 70 +++++++
3 files changed, 284 insertions(+), 69 deletions(-)
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index b7e0188e1..3f2f9af3f 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -24,6 +24,7 @@ import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.rest.models.Schema;
import org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.common.state.ListState;
@@ -36,6 +37,8 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
@@ -45,22 +48,28 @@ import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SinkMetricData;
import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.apache.inlong.sort.doris.model.RespContent;
+import org.apache.inlong.sort.doris.util.DorisParseUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.StringJoiner;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
@@ -85,6 +94,16 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
* Mark the record for not delete
*/
private static final String DORIS_DELETE_FALSE = "0";
+ private static final String FORMAT_JSON_VALUE = "json";
+ private static final String FORMAT_KEY = "format";
+ private static final String FIELD_DELIMITER_KEY = "column_separator";
+ private static final String FIELD_DELIMITER_DEFAULT = "\t";
+ private static final String LINE_DELIMITER_KEY = "line_delimiter";
+ private static final String LINE_DELIMITER_DEFAULT = "\n";
+ private static final String NULL_VALUE = "\\N";
+ private static final String ESCAPE_DELIMITERS_KEY = "escape_delimiters";
+ private static final String ESCAPE_DELIMITERS_DEFAULT = "false";
+ private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS";
@SuppressWarnings({"rawtypes"})
private final Map<String, List> batchMap = new HashMap<>();
private final Map<String, String> columnsMap = new HashMap<>();
@@ -92,10 +111,6 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
private final DorisOptions options;
private final DorisReadOptions readOptions;
private final DorisExecutionOptions executionOptions;
- private final String databasePattern;
- private final String tablePattern;
- private final String dynamicSchemaFormat;
- private final boolean ignoreSingleTableErrors;
private final Map<String, Exception> flushExceptionMap = new HashMap<>();
private final AtomicLong readInNum = new AtomicLong(0);
private final AtomicLong writeOutNum = new AtomicLong(0);
@@ -103,6 +118,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
private final AtomicLong ddlNum = new AtomicLong(0);
private final String inlongMetric;
private final String auditHostAndPorts;
+ private final boolean multipleSink;
+ private volatile String tableIdentifier;
+ private volatile String databasePattern;
+ private volatile String tablePattern;
+ private volatile String dynamicSchemaFormat;
+ private volatile boolean ignoreSingleTableErrors;
private long batchBytes = 0L;
private int size;
private DorisStreamLoad dorisStreamLoad;
@@ -114,6 +135,13 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
private transient SinkMetricData metricData;
private transient ListState<MetricState> metricStateListState;
private transient MetricState metricState;
+ private volatile String[] fieldNames;
+ private volatile boolean jsonFormat;
+ private String keysType;
+ private volatile RowData.FieldGetter[] fieldGetters;
+ private String fieldDelimiter;
+ private String lineDelimiter;
+ private volatile LogicalType[] logicalTypes;
public DorisDynamicSchemaOutputFormat(DorisOptions option,
DorisReadOptions readOptions,
@@ -123,7 +151,8 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
String tablePattern,
boolean ignoreSingleTableErrors,
String inlongMetric,
- String auditHostAndPorts) {
+ String auditHostAndPorts,
+ boolean multipleSink) {
this.options = option;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
@@ -131,10 +160,31 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
this.databasePattern = databasePattern;
this.tablePattern = tablePattern;
this.ignoreSingleTableErrors = ignoreSingleTableErrors;
+ this.multipleSink = multipleSink;
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
}
+ public DorisDynamicSchemaOutputFormat(DorisOptions option,
+ DorisReadOptions readOptions,
+ DorisExecutionOptions executionOptions,
+ String tableIdentifier,
+ LogicalType[] logicalTypes,
+ String[] fieldNames,
+ String inlongMetric,
+ String auditHostAndPorts,
+ boolean multipleSink) {
+ this.options = option;
+ this.readOptions = readOptions;
+ this.executionOptions = executionOptions;
+ this.tableIdentifier = tableIdentifier;
+ this.fieldNames = fieldNames;
+ this.multipleSink = multipleSink;
+ this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
+ this.logicalTypes = logicalTypes;
+ }
+
/**
* A builder used to set parameters to the output format's configuration in a fluent way.
*
@@ -144,8 +194,48 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
return new DorisDynamicSchemaOutputFormat.Builder();
}
+ private String parseKeysType() {
+ try {
+ Schema schema = RestService.getSchema(options, readOptions, LOG);
+ return schema.getKeysType();
+ } catch (DorisException e) {
+ throw new RuntimeException("Failed fetch doris table schema: " + options.getTableIdentifier());
+ }
+ }
+
+ private void handleStreamLoadProp() {
+ Properties props = executionOptions.getStreamLoadProp();
+ boolean ifEscape = Boolean.parseBoolean(props.getProperty(ESCAPE_DELIMITERS_KEY, ESCAPE_DELIMITERS_DEFAULT));
+ this.fieldDelimiter = props.getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT);
+ this.lineDelimiter = props.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT);
+ if (ifEscape) {
+ this.fieldDelimiter = DorisParseUtils.escapeString(fieldDelimiter);
+ this.lineDelimiter = DorisParseUtils.escapeString(lineDelimiter);
+ props.remove(ESCAPE_DELIMITERS_KEY);
+ }
+
+ // add column key when fieldNames is not empty
+ if (!props.containsKey(COLUMNS_KEY) && fieldNames != null && fieldNames.length > 0) {
+ String columns = Arrays.stream(fieldNames)
+ .map(item -> String.format("`%s`", item.trim().replace("`", "")))
+ .collect(Collectors.joining(","));
+ props.put(COLUMNS_KEY, columns);
+ }
+
+ // if enable batch delete, the columns must add tag '__DORIS_DELETE_SIGN__'
+ String columns = (String) props.get(COLUMNS_KEY);
+ if (!columns.contains(DORIS_DELETE_SIGN) && enableBatchDelete()) {
+ props.put(COLUMNS_KEY, String.format("%s,%s", columns, DORIS_DELETE_SIGN));
+ }
+ }
+
private boolean enableBatchDelete() {
- return executionOptions.getEnableDelete();
+ try {
+ Schema schema = RestService.getSchema(options, readOptions, LOG);
+ return executionOptions.getEnableDelete() || UNIQUE_KEYS_TYPE.equals(schema.getKeysType());
+ } catch (DorisException e) {
+ throw new RuntimeException("Failed fetch doris table schema: " + options.getTableIdentifier(), e);
+ }
}
@Override
@@ -154,13 +244,21 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
@Override
public void open(int taskNumber, int numTasks) throws IOException {
- dorisStreamLoad = new DorisStreamLoad(
- getBackend(),
- options.getUsername(),
- options.getPassword(),
- executionOptions.getStreamLoadProp());
- jsonDynamicSchemaFormat = (JsonDynamicSchemaFormat)
- DynamicSchemaFormatFactory.getFormat(dynamicSchemaFormat);
+ Properties loadProps = executionOptions.getStreamLoadProp();
+ dorisStreamLoad = new DorisStreamLoad(getBackend(), options.getUsername(), options.getPassword(), loadProps);
+ if (!multipleSink) {
+ this.jsonFormat = true;
+ handleStreamLoadProp();
+ this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
+ for (int i = 0; i < logicalTypes.length; i++) {
+ fieldGetters[i] = RowData.createFieldGetter(logicalTypes[i], i);
+ }
+ }
+
+ if (multipleSink && StringUtils.isNotBlank(dynamicSchemaFormat)) {
+ jsonDynamicSchemaFormat =
+ (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(dynamicSchemaFormat);
+ }
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
.withInlongAudit(auditHostAndPorts)
@@ -183,14 +281,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
}
private boolean checkFlushException(String tableIdentifier) {
- Exception flushException = flushExceptionMap.get(tableIdentifier);
- if (flushException == null) {
+ Exception ex = flushExceptionMap.get(tableIdentifier);
+ if (!multipleSink || ex == null) {
return false;
}
if (!ignoreSingleTableErrors) {
- throw new RuntimeException(
- String.format("Writing records to streamload of tableIdentifier:%s failed.", tableIdentifier),
- flushException);
+ throw new RuntimeException("Writing records to streamload failed, tableIdentifier=" + tableIdentifier, ex);
}
return true;
}
@@ -205,8 +301,54 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
}
}
+ public void addSingle(T row) {
+ if (row instanceof RowData) {
+ RowData rowData = (RowData) row;
+ Map<String, String> valueMap = new HashMap<>();
+ StringJoiner value = new StringJoiner(this.fieldDelimiter);
+ for (int i = 0; i < rowData.getArity() && i < fieldGetters.length; ++i) {
+ Object field = fieldGetters[i].getFieldOrNull(rowData);
+ if (jsonFormat) {
+ String data = field != null ? field.toString() : null;
+ valueMap.put(this.fieldNames[i], data);
+ batchBytes += this.fieldNames[i].getBytes(StandardCharsets.UTF_8).length;
+ if (data != null) {
+ batchBytes += data.getBytes(StandardCharsets.UTF_8).length;
+ }
+ } else {
+ String data = field != null ? field.toString() : NULL_VALUE;
+ value.add(data);
+ batchBytes += data.getBytes(StandardCharsets.UTF_8).length;
+ }
+ }
+ // add doris delete sign
+ if (enableBatchDelete()) {
+ if (jsonFormat) {
+ valueMap.put(DORIS_DELETE_SIGN, DorisParseUtils.parseDeleteSign(rowData.getRowKind()));
+ } else {
+ value.add(DorisParseUtils.parseDeleteSign(rowData.getRowKind()));
+ }
+ }
+ Object data = jsonFormat ? valueMap : value.toString();
+ List mapData = batchMap.getOrDefault(tableIdentifier, new ArrayList<String>());
+ mapData.add(data);
+ batchMap.putIfAbsent(tableIdentifier, mapData);
+ } else if (row instanceof String) {
+ batchBytes += ((String) row).getBytes(StandardCharsets.UTF_8).length;
+ List mapData = batchMap.getOrDefault(tableIdentifier, new ArrayList<String>());
+ mapData.add(row);
+ batchMap.putIfAbsent(tableIdentifier, mapData);
+ } else {
+ throw new RuntimeException("The type of element should be 'RowData' or 'String' only.");
+ }
+ }
+
@SuppressWarnings({"unchecked"})
private void addBatch(T row) throws IOException {
+ if (!multipleSink) {
+ addSingle(row);
+ return;
+ }
if (row instanceof RowData) {
RowData rowData = (RowData) row;
JsonNode rootNode = jsonDynamicSchemaFormat.deserialize(rowData.getBinary(0));
@@ -273,7 +415,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
break;
default:
errorNum.incrementAndGet();
- throw new RuntimeException("Unrecognized row kind:" + rowKind.toString());
+ throw new RuntimeException("Unrecognized row kind:" + rowKind);
}
}
}
@@ -359,6 +501,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
flushing = false;
return;
}
+
for (Entry<String, List> kvs : batchMap.entrySet()) {
flushSingleTable(kvs.getKey(), kvs.getValue());
}
@@ -379,6 +522,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
if (checkFlushException(tableIdentifier) || values == null || values.isEmpty()) {
return;
}
+
String loadValue = null;
try {
loadValue = OBJECT_MAPPER.writeValueAsString(values);
@@ -390,7 +534,6 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
} catch (Exception e) {
LOG.warn("metricData invoke get err:", e);
}
- LOG.info("load {} records to tableIdentifier: {}", values.size(), tableIdentifier);
writeOutNum.addAndGet(values.size());
// Clean the data that has been loaded.
values.clear();
@@ -426,8 +569,10 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
private RespContent load(String tableIdentifier, String result) throws IOException {
String[] tableWithDb = tableIdentifier.split("\\.");
RespContent respContent = null;
- // Dynamic set COLUMNS_KEY for tableIdentifier every time
- executionOptions.getStreamLoadProp().put(COLUMNS_KEY, columnsMap.get(tableIdentifier));
+ // Dynamic set COLUMNS_KEY for tableIdentifier every time for multiple sink scenario
+ if (multipleSink) {
+ executionOptions.getStreamLoadProp().put(COLUMNS_KEY, columnsMap.get(tableIdentifier));
+ }
for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
try {
respContent = dorisStreamLoad.load(tableWithDb[0], tableWithDb[1], result);
@@ -441,7 +586,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
dorisStreamLoad.setHostPort(getBackend());
LOG.warn("streamload error,switch be: {}",
dorisStreamLoad.getLoadUrlStr(tableWithDb[0], tableWithDb[1]), e);
- Thread.sleep(1000 * i);
+ Thread.sleep(1000L * i);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException("unable to flush; interrupted while doing another attempt", e);
@@ -493,8 +638,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
private String databasePattern;
private String tablePattern;
private boolean ignoreSingleTableErrors;
+ private boolean multipleSink;
private String inlongMetric;
private String auditHostAndPorts;
+ private String tableIdentifier;
+ private DataType[] fieldDataTypes;
+ private String[] fieldNames;
public Builder() {
this.optionsBuilder = DorisOptions.builder().setTableIdentifier("");
@@ -515,6 +664,21 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
return this;
}
+ public DorisDynamicSchemaOutputFormat.Builder setTableIdentifier(String tableIdentifier) {
+ this.tableIdentifier = tableIdentifier;
+ return this;
+ }
+
+ public DorisDynamicSchemaOutputFormat.Builder setFieldDataTypes(DataType[] fieldDataTypes) {
+ this.fieldDataTypes = fieldDataTypes;
+ return this;
+ }
+
+ public DorisDynamicSchemaOutputFormat.Builder setFieldNames(String[] fieldNames) {
+ this.fieldNames = fieldNames;
+ return this;
+ }
+
public DorisDynamicSchemaOutputFormat.Builder setReadOptions(DorisReadOptions readOptions) {
this.readOptions = readOptions;
return this;
@@ -546,6 +710,11 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
return this;
}
+ public DorisDynamicSchemaOutputFormat.Builder setMultipleSink(boolean multipleSink) {
+ this.multipleSink = multipleSink;
+ return this;
+ }
+
public DorisDynamicSchemaOutputFormat.Builder setInlongMetric(String inlongMetric) {
this.inlongMetric = inlongMetric;
return this;
@@ -558,10 +727,20 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
@SuppressWarnings({"rawtypes"})
public DorisDynamicSchemaOutputFormat build() {
+ if (!multipleSink) {
+ LogicalType[] logicalTypes = Arrays.stream(fieldDataTypes)
+ .map(DataType::getLogicalType).toArray(LogicalType[]::new);
+
+ return new DorisDynamicSchemaOutputFormat(
+ optionsBuilder.setTableIdentifier(tableIdentifier).build(), readOptions, executionOptions,
+ tableIdentifier,
+ logicalTypes, fieldNames,
+ inlongMetric, auditHostAndPorts, multipleSink);
+ }
return new DorisDynamicSchemaOutputFormat(
optionsBuilder.build(), readOptions, executionOptions,
dynamicSchemaFormat, databasePattern, tablePattern,
- ignoreSingleTableErrors, inlongMetric, auditHostAndPorts);
+ ignoreSingleTableErrors, inlongMetric, auditHostAndPorts, multipleSink);
}
}
-}
\ No newline at end of file
+}
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
index c6cdd4df6..76a53e659 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
@@ -17,23 +17,15 @@
package org.apache.inlong.sort.doris.table;
-import java.util.Properties;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.exception.DorisException;
-import org.apache.doris.flink.rest.RestService;
-import org.apache.doris.flink.rest.models.Schema;
-import org.apache.doris.flink.table.DorisDynamicOutputFormat;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.OutputFormatProvider;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.types.RowKind;
import org.apache.inlong.sort.doris.internal.GenericDorisSinkFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* DorisDynamicTableSink copy from {@link org.apache.doris.flink.table.DorisDynamicTableSink}
@@ -41,8 +33,6 @@ import org.slf4j.LoggerFactory;
**/
public class DorisDynamicTableSink implements DynamicTableSink {
- private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicTableSink.class);
-
private final DorisOptions options;
private final DorisReadOptions readOptions;
private final DorisExecutionOptions executionOptions;
@@ -55,9 +45,6 @@ public class DorisDynamicTableSink implements DynamicTableSink {
private final String inlongMetric;
private final String auditHostAndPorts;
private final Integer parallelism;
- private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS";
- private static final String COLUMNS_KEY = "columns";
- private static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
public DorisDynamicTableSink(DorisOptions options,
DorisReadOptions readOptions,
@@ -97,37 +84,25 @@ public class DorisDynamicTableSink implements DynamicTableSink {
@SuppressWarnings({"unchecked"})
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
- if (!multipleSink) {
- Properties loadProperties = executionOptions.getStreamLoadProp();
- // if enable batch delete, the columns must add tag '__DORIS_DELETE_SIGN__'
- String columns = (String) loadProperties.get(COLUMNS_KEY);
- if (loadProperties.containsKey(COLUMNS_KEY) && !columns.contains(DORIS_DELETE_SIGN)
- && enableBatchDelete()) {
- loadProperties.put(COLUMNS_KEY, String.format("%s,%s", columns, DORIS_DELETE_SIGN));
- }
- DorisDynamicOutputFormat.Builder builder = DorisDynamicOutputFormat.builder()
- .setFenodes(options.getFenodes())
- .setUsername(options.getUsername())
- .setPassword(options.getPassword())
- .setTableIdentifier(options.getTableIdentifier())
- .setReadOptions(readOptions)
- .setExecutionOptions(executionOptions)
- .setFieldDataTypes(tableSchema.getFieldDataTypes())
- .setFieldNames(tableSchema.getFieldNames());
- return OutputFormatProvider.of(builder.build());
- }
DorisDynamicSchemaOutputFormat.Builder builder = DorisDynamicSchemaOutputFormat.builder()
.setFenodes(options.getFenodes())
.setUsername(options.getUsername())
.setPassword(options.getPassword())
.setReadOptions(readOptions)
.setExecutionOptions(executionOptions)
+ .setInlongMetric(inlongMetric)
+ .setAuditHostAndPorts(auditHostAndPorts)
+ .setTableIdentifier(options.getTableIdentifier())
+ .setFieldDataTypes(tableSchema.getFieldDataTypes())
+ .setFieldNames(tableSchema.getFieldNames())
+ .setInlongMetric(inlongMetric)
+ .setAuditHostAndPorts(auditHostAndPorts)
+ .setMultipleSink(multipleSink)
.setDatabasePattern(databasePattern)
.setTablePattern(tablePattern)
.setDynamicSchemaFormat(sinkMultipleFormat)
- .setIgnoreSingleTableErrors(ignoreSingleTableErrors)
- .setInlongMetric(inlongMetric)
- .setAuditHostAndPorts(auditHostAndPorts);
+ .setIgnoreSingleTableErrors(ignoreSingleTableErrors);
+
return SinkFunctionProvider.of(
new GenericDorisSinkFunction<>(builder.build()), parallelism);
}
@@ -143,14 +118,5 @@ public class DorisDynamicTableSink implements DynamicTableSink {
public String asSummaryString() {
return "Doris Table Sink Of InLong";
}
-
- private boolean enableBatchDelete() {
- try {
- Schema schema = RestService.getSchema(options, readOptions, LOG);
- return executionOptions.getEnableDelete() || UNIQUE_KEYS_TYPE.equals(schema.getKeysType());
- } catch (DorisException e) {
- throw new RuntimeException("Failed fetch doris table schema: " + options.getTableIdentifier(), e);
- }
- }
}
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/util/DorisParseUtils.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/util/DorisParseUtils.java
new file mode 100644
index 000000000..81f82762d
--- /dev/null
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/util/DorisParseUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.doris.util;
+
+import org.apache.flink.types.RowKind;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * A utility class primarily serving DorisDynamicSchemaOutputFormat
+ */
+public class DorisParseUtils {
+
+ /**
+ * Pattern of escape mode for hexadecimal characters, such as "hi\\x33hi\\x44hello".
+ */
+ private static final Pattern HEX_PATTERN = Pattern.compile("\\\\x(\\d{2})");
+
+ /**
+ * A utility function used to determine the DORIS_DELETE_SIGN for a row change.
+ *
+ * @param rowKind the row change
+ * @return the doris delete sign corresponding to the change
+ */
+ public static String parseDeleteSign(RowKind rowKind) {
+ if (RowKind.INSERT.equals(rowKind) || RowKind.UPDATE_AFTER.equals(rowKind)) {
+ return "0";
+ } else if (RowKind.DELETE.equals(rowKind) || RowKind.UPDATE_BEFORE.equals(rowKind)) {
+ return "1";
+ } else {
+ throw new RuntimeException("Unrecognized row kind: " + rowKind.toString());
+ }
+ }
+
+ /**
+ * A utility used to parse a string according to the given hexadecimal escape sequence.
+ * <p/>
+ * Example input: ""hi\\x33hi\\x44hello"" , where \x33 is '!', \x44 is ','
+ * Example output: "hi!hi,hello"
+ *
+ * @param hexStr hex string before parsing
+ * @return the parsed string
+ */
+ public static String escapeString(String hexStr) {
+ Matcher matcher = HEX_PATTERN.matcher(hexStr);
+ StringBuffer buf = new StringBuffer();
+ while (matcher.find()) {
+ matcher.appendReplacement(buf, String.format("%s", (char) Integer.parseInt(matcher.group(1))));
+ }
+ matcher.appendTail(buf);
+
+ return buf.toString();
+ }
+
+}