You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/11/01 11:56:21 UTC
[inlong] branch master updated: [INLONG-6345][Sort] Add metrics for Doris connector (#6346)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5c48e6fe6 [INLONG-6345][Sort] Add metrics for Doris connector (#6346)
5c48e6fe6 is described below
commit 5c48e6fe67fd635237e4ac1133cbf167a4427ec0
Author: kuansix <49...@qq.com>
AuthorDate: Tue Nov 1 19:56:15 2022 +0800
[INLONG-6345][Sort] Add metrics for Doris connector (#6346)
---
.../sort/protocol/node/load/DorisLoadNode.java | 3 +-
.../doris/internal/GenericDorisSinkFunction.java | 78 ++++++++++++++++++
.../inlong/sort/doris/model/RespContent.java | 96 ++++++++++++++++++++++
.../table/DorisDynamicSchemaOutputFormat.java | 87 ++++++++++++++++++--
.../sort/doris/table/DorisDynamicTableFactory.java | 10 ++-
.../sort/doris/table/DorisDynamicTableSink.java | 23 +++++-
.../inlong/sort/doris/table/DorisStreamLoad.java | 5 +-
licenses/inlong-sort-connectors/LICENSE | 1 +
8 files changed, 290 insertions(+), 13 deletions(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java
index 583970430..a281f24fe 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java
@@ -27,6 +27,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInc
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.InlongMetric;
import org.apache.inlong.sort.protocol.constant.DorisConstant;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
import org.apache.inlong.sort.protocol.node.LoadNode;
@@ -53,7 +54,7 @@ import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIP
@JsonInclude(Include.NON_NULL)
@Data
@NoArgsConstructor
-public class DorisLoadNode extends LoadNode implements Serializable {
+public class DorisLoadNode extends LoadNode implements InlongMetric, Serializable {
private static final long serialVersionUID = -8002903269814211382L;
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/internal/GenericDorisSinkFunction.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/internal/GenericDorisSinkFunction.java
new file mode 100644
index 000000000..74671fb37
--- /dev/null
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/internal/GenericDorisSinkFunction.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.doris.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.doris.table.DorisDynamicSchemaOutputFormat;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+
+/**
+ * A generic SinkFunction for Doris.
+ *
+ * Add an option `inlong.metric` to support metrics.
+ */
+@Internal
+public class GenericDorisSinkFunction<T> extends RichSinkFunction<T>
+ implements CheckpointedFunction {
+
+ private final DorisDynamicSchemaOutputFormat<T> outputFormat;
+
+ public GenericDorisSinkFunction(@Nonnull DorisDynamicSchemaOutputFormat<T> outputFormat) {
+ this.outputFormat = Preconditions.checkNotNull(outputFormat);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ RuntimeContext ctx = getRuntimeContext();
+ outputFormat.setRuntimeContext(ctx);
+ outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
+ }
+
+ @Override
+ public void invoke(T value, Context context) throws IOException {
+ outputFormat.writeRecord(value);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ outputFormat.setRuntimeContext(getRuntimeContext());
+ outputFormat.initializeState(context);
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ outputFormat.flush();
+ outputFormat.snapshotState(context);
+ }
+
+ @Override
+ public void close() throws IOException {
+ outputFormat.close();
+ }
+}
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/model/RespContent.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/model/RespContent.java
new file mode 100644
index 000000000..5a43f05aa
--- /dev/null
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/model/RespContent.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.doris.model;
+
+import lombok.Data;
+import org.apache.doris.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.doris.shaded.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.doris.shaded.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * RespContent copy from {@link org.apache.doris.flink.rest.models.RespContent}
+ **/
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class RespContent {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ @JsonProperty(value = "TxnId")
+ private int txnId;
+
+ @JsonProperty(value = "Label")
+ private String label;
+
+ @JsonProperty(value = "Status")
+ private String status;
+
+ @JsonProperty(value = "ExistingJobStatus")
+ private String existingJobStatus;
+
+ @JsonProperty(value = "Message")
+ private String message;
+
+ @JsonProperty(value = "NumberTotalRows")
+ private long numberTotalRows;
+
+ @JsonProperty(value = "NumberLoadedRows")
+ private long numberLoadedRows;
+
+ @JsonProperty(value = "NumberFilteredRows")
+ private int numberFilteredRows;
+
+ @JsonProperty(value = "NumberUnselectedRows")
+ private int numberUnselectedRows;
+
+ @JsonProperty(value = "LoadBytes")
+ private long loadBytes;
+
+ @JsonProperty(value = "LoadTimeMs")
+ private int loadTimeMs;
+
+ @JsonProperty(value = "BeginTxnTimeMs")
+ private int beginTxnTimeMs;
+
+ @JsonProperty(value = "StreamLoadPutTimeMs")
+ private int streamLoadPutTimeMs;
+
+ @JsonProperty(value = "ReadDataTimeMs")
+ private int readDataTimeMs;
+
+ @JsonProperty(value = "WriteDataTimeMs")
+ private int writeDataTimeMs;
+
+ @JsonProperty(value = "CommitAndPublishTimeMs")
+ private int commitAndPublishTimeMs;
+
+ @JsonProperty(value = "ErrorURL")
+ private String errorURL;
+
+ @Override
+ public String toString() {
+ try {
+ return OBJECT_MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ return "";
+ }
+ }
+}
+
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index 9e8142ae7..8e4100eb6 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -26,13 +26,24 @@ import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.apache.inlong.sort.doris.model.RespContent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,6 +60,10 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
/**
* DorisDynamicSchemaOutputFormat, copy from {@link org.apache.doris.flink.table.DorisDynamicOutputFormat}
* It is used in the multiple sink scenario, in this scenario, we directly convert the data format by
@@ -91,13 +106,21 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
private transient ScheduledFuture<?> scheduledFuture;
private transient JsonDynamicSchemaFormat jsonDynamicSchemaFormat;
+ private final String inlongMetric;
+ private final String auditHostAndPorts;
+ private transient SinkMetricData metricData;
+ private transient ListState<MetricState> metricStateListState;
+ private transient MetricState metricState;
+
public DorisDynamicSchemaOutputFormat(DorisOptions option,
DorisReadOptions readOptions,
DorisExecutionOptions executionOptions,
String dynamicSchemaFormat,
String databasePattern,
String tablePattern,
- boolean ignoreSingleTableErrors) {
+ boolean ignoreSingleTableErrors,
+ String inlongMetric,
+ String auditHostAndPorts) {
this.options = option;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
@@ -105,6 +128,8 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
this.databasePattern = databasePattern;
this.tablePattern = tablePattern;
this.ignoreSingleTableErrors = ignoreSingleTableErrors;
+ this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
}
/**
@@ -133,6 +158,16 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
executionOptions.getStreamLoadProp());
jsonDynamicSchemaFormat = (JsonDynamicSchemaFormat)
DynamicSchemaFormatFactory.getFormat(dynamicSchemaFormat);
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withInlongAudit(auditHostAndPorts)
+ .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+ .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+ .withRegisterMetric(MetricOption.RegisteredMetric.ALL)
+ .build();
+ if (metricOption != null) {
+ metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
+ }
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
this.scheduler = new ScheduledThreadPoolExecutor(1,
new ExecutorThreadFactory("doris-streamload-output-format"));
@@ -276,7 +311,14 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
String loadValue = null;
try {
loadValue = OBJECT_MAPPER.writeValueAsString(kvs.getValue());
- load(kvs.getKey(), loadValue);
+ RespContent respContent = load(kvs.getKey(), loadValue);
+ try {
+ if (null != metricData && null != respContent) {
+ metricData.invoke(respContent.getNumberLoadedRows(), respContent.getLoadBytes());
+ }
+ } catch (Exception e) {
+ LOG.warn("metricData invoke get err:", e);
+ }
LOG.info("load {} records to tableIdentifier: {}", kvs.getValue().size(), kvs.getKey());
writeOutNum.addAndGet(kvs.getValue().size());
// Clean the data that has been loaded.
@@ -320,11 +362,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
return hasRecords;
}
- private void load(String tableIdentifier, String result) throws IOException {
+ private RespContent load(String tableIdentifier, String result) throws IOException {
String[] tableWithDb = tableIdentifier.split("\\.");
+ RespContent respContent = null;
for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
try {
- dorisStreamLoad.load(tableWithDb[0], tableWithDb[1], result);
+ respContent = dorisStreamLoad.load(tableWithDb[0], tableWithDb[1], result);
break;
} catch (StreamLoadException e) {
LOG.error("doris sink error, retry times = {}", i, e);
@@ -342,6 +385,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
}
}
}
+ return respContent;
}
private String getBackend() throws IOException {
@@ -354,6 +398,26 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
}
}
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ if (metricData != null && metricStateListState != null) {
+ MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData,
+ getRuntimeContext().getIndexOfThisSubtask());
+ }
+ }
+
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ if (this.inlongMetric != null) {
+ this.metricStateListState = context.getOperatorStateStore().getUnionListState(
+ new ListStateDescriptor<>(
+ INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+ })));
+ }
+ if (context.isRestored()) {
+ metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+ getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+ }
+
/**
* Builder for {@link DorisDynamicSchemaOutputFormat}.
*/
@@ -366,6 +430,8 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
private String databasePattern;
private String tablePattern;
private boolean ignoreSingleTableErrors;
+ private String inlongMetric;
+ private String auditHostAndPorts;
public Builder() {
this.optionsBuilder = DorisOptions.builder().setTableIdentifier("");
@@ -417,11 +483,22 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
return this;
}
+ public DorisDynamicSchemaOutputFormat.Builder setInlongMetric(String inlongMetric) {
+ this.inlongMetric = inlongMetric;
+ return this;
+ }
+
+ public DorisDynamicSchemaOutputFormat.Builder setAuditHostAndPorts(String auditHostAndPorts) {
+ this.auditHostAndPorts = auditHostAndPorts;
+ return this;
+ }
+
@SuppressWarnings({"rawtypes"})
public DorisDynamicSchemaOutputFormat build() {
return new DorisDynamicSchemaOutputFormat(
optionsBuilder.build(), readOptions, executionOptions,
- dynamicSchemaFormat, databasePattern, tablePattern, ignoreSingleTableErrors);
+ dynamicSchemaFormat, databasePattern, tablePattern,
+ ignoreSingleTableErrors, inlongMetric, auditHostAndPorts);
}
}
}
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
index 98c08c753..f3c263f93 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
@@ -59,6 +59,8 @@ import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
/**
* This class copy from {@link org.apache.doris.flink.table.DorisDynamicTableFactory}
@@ -210,6 +212,9 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
options.add(SINK_MULTIPLE_TABLE_PATTERN);
options.add(SINK_MULTIPLE_ENABLE);
options.add(SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS);
+ options.add(INLONG_METRIC);
+ options.add(INLONG_AUDIT);
+ options.add(FactoryUtil.SINK_PARALLELISM);
return options;
}
@@ -296,13 +301,16 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
String sinkMultipleFormat = helper.getOptions().getOptional(SINK_MULTIPLE_FORMAT).orElse(null);
validateSinkMultiple(physicalSchema.toPhysicalRowDataType(),
multipleSink, sinkMultipleFormat, databasePattern, tablePattern);
+ String inlongMetric = helper.getOptions().getOptional(INLONG_METRIC).orElse(INLONG_METRIC.defaultValue());
+ String auditHostAndPorts = helper.getOptions().getOptional(INLONG_AUDIT).orElse(INLONG_AUDIT.defaultValue());
+ Integer parallelism = helper.getOptions().getOptional(FactoryUtil.SINK_PARALLELISM).orElse(1);
// create and return dynamic table sink
return new DorisDynamicTableSink(
getDorisOptions(helper.getOptions()),
getDorisReadOptions(helper.getOptions()),
getDorisExecutionOptions(helper.getOptions(), streamLoadProp),
physicalSchema, multipleSink, sinkMultipleFormat, databasePattern,
- tablePattern, ignoreSingleTableErrors);
+ tablePattern, ignoreSingleTableErrors, inlongMetric, auditHostAndPorts, parallelism);
}
private void validateSinkMultiple(DataType physicalDataType, boolean multipleSink, String sinkMultipleFormat,
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
index bc847a398..8d961fdb5 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
@@ -25,7 +25,9 @@ import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.OutputFormatProvider;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.doris.internal.GenericDorisSinkFunction;
/**
* DorisDynamicTableSink copy from {@link org.apache.doris.flink.table.DorisDynamicTableSink}
@@ -42,6 +44,9 @@ public class DorisDynamicTableSink implements DynamicTableSink {
private final String databasePattern;
private final String tablePattern;
private final boolean ignoreSingleTableErrors;
+ private final String inlongMetric;
+ private final String auditHostAndPorts;
+ private final Integer parallelism;
public DorisDynamicTableSink(DorisOptions options,
DorisReadOptions readOptions,
@@ -51,7 +56,10 @@ public class DorisDynamicTableSink implements DynamicTableSink {
String sinkMultipleFormat,
String databasePattern,
String tablePattern,
- boolean ignoreSingleTableErrors) {
+ boolean ignoreSingleTableErrors,
+ String inlongMetric,
+ String auditHostAndPorts,
+ Integer parallelism) {
this.options = options;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
@@ -61,6 +69,9 @@ public class DorisDynamicTableSink implements DynamicTableSink {
this.databasePattern = databasePattern;
this.tablePattern = tablePattern;
this.ignoreSingleTableErrors = ignoreSingleTableErrors;
+ this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
+ this.parallelism = parallelism;
}
@Override
@@ -96,14 +107,18 @@ public class DorisDynamicTableSink implements DynamicTableSink {
.setDatabasePattern(databasePattern)
.setTablePattern(tablePattern)
.setDynamicSchemaFormat(sinkMultipleFormat)
- .setIgnoreSingleTableErrors(ignoreSingleTableErrors);
- return OutputFormatProvider.of(builder.build());
+ .setIgnoreSingleTableErrors(ignoreSingleTableErrors)
+ .setInlongMetric(inlongMetric)
+ .setAuditHostAndPorts(auditHostAndPorts);
+ return SinkFunctionProvider.of(
+ new GenericDorisSinkFunction<>(builder.build()), parallelism);
}
@Override
public DynamicTableSink copy() {
return new DorisDynamicTableSink(options, readOptions, executionOptions, tableSchema,
- multipleSink, sinkMultipleFormat, databasePattern, tablePattern, ignoreSingleTableErrors);
+ multipleSink, sinkMultipleFormat, databasePattern, tablePattern,
+ ignoreSingleTableErrors, inlongMetric, auditHostAndPorts, parallelism);
}
@Override
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisStreamLoad.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisStreamLoad.java
index 94b075ef5..1f688bebb 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisStreamLoad.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisStreamLoad.java
@@ -19,7 +19,6 @@ package org.apache.inlong.sort.doris.table;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.exception.StreamLoadException;
-import org.apache.doris.flink.rest.models.RespContent;
import org.apache.doris.shaded.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.shaded.org.apache.commons.codec.binary.Base64;
@@ -32,6 +31,7 @@ import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
+import org.apache.inlong.sort.doris.model.RespContent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,7 +80,7 @@ public class DorisStreamLoad implements Serializable {
this.httpClient = httpClientBuilder.build();
}
- public void load(String db, String tbl, String value) throws StreamLoadException {
+ public RespContent load(String db, String tbl, String value) throws StreamLoadException {
LoadResponse loadResponse = loadBatch(db, tbl, value);
LOG.info("Streamload Response:{}", loadResponse);
if (loadResponse.status != 200) {
@@ -93,6 +93,7 @@ public class DorisStreamLoad implements Serializable {
respContent.getErrorURL());
throw new StreamLoadException(errMsg);
}
+ return respContent;
} catch (IOException e) {
throw new StreamLoadException(e);
}
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 53beba554..95793e251 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -608,6 +608,7 @@
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisStreamLoad.java
+ inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/model/RespContent.java
Source : org.apache.doris:flink-doris-connector-1.13_2.11:1.0.3 (Please note that the software have been modified.)
License : https://github.com/apache/doris-flink-connector/blob/1.13_2.11-1.0.3/LICENSE.txt