You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/10 08:15:49 UTC
[inlong] branch master updated: [INLONG-5446][Sort] Add reporting metric from JDBC to audit SDK and refactor according to… (#5449)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 22a32e8cb [INLONG-5446][Sort] Add reporting metric from JDBC to audit SDK and refactor according to… (#5449)
22a32e8cb is described below
commit 22a32e8cb925d7107218d049b10188e5b953e148
Author: Oneal65 <li...@foxmail.com>
AuthorDate: Wed Aug 10 16:15:45 2022 +0800
[INLONG-5446][Sort] Add reporting metric from JDBC to audit SDK and refactor according to… (#5449)
---
inlong-sort/sort-connectors/jdbc/pom.xml | 10 ++
.../jdbc/internal/JdbcBatchingOutputFormat.java | 114 +++++++++++++++------
.../jdbc/internal/TableJdbcUpsertOutputFormat.java | 12 ++-
.../apache/inlong/sort/jdbc/metric/MetricData.java | 111 --------------------
.../jdbc/table/JdbcDynamicOutputFormatBuilder.java | 12 ++-
.../sort/jdbc/table/JdbcDynamicTableFactory.java | 15 ++-
.../sort/jdbc/table/JdbcDynamicTableSink.java | 15 ++-
7 files changed, 128 insertions(+), 161 deletions(-)
diff --git a/inlong-sort/sort-connectors/jdbc/pom.xml b/inlong-sort/sort-connectors/jdbc/pom.xml
index 5230f8ce5..79a3a1c2c 100644
--- a/inlong-sort/sort-connectors/jdbc/pom.xml
+++ b/inlong-sort/sort-connectors/jdbc/pom.xml
@@ -33,6 +33,16 @@
<packaging>jar</packaging>
<dependencies>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>audit-sdk</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!--for clickhouse-->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
index 95044c8bf..55474def8 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -33,7 +33,8 @@ import org.apache.flink.connector.jdbc.utils.JdbcUtils;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
-import org.apache.inlong.sort.jdbc.metric.MetricData;
+import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +43,9 @@ import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -51,6 +54,14 @@ import java.util.function.Function;
import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.base.Constants.AUDIT_SORT_INPUT;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
/**
* A JDBC outputFormat that supports batching records before writing records to database.
@@ -66,6 +77,7 @@ public class JdbcBatchingOutputFormat<
private final StatementExecutorFactory<JdbcExec> statementExecutorFactory;
private final RecordExtractor<In, JdbcIn> jdbcRecordExtractor;
private final String inLongMetric;
+ private final String auditHostAndPorts;
private transient JdbcExec jdbcStatementExecutor;
private transient int batchCount = 0;
private transient volatile boolean closed = false;
@@ -74,7 +86,10 @@ public class JdbcBatchingOutputFormat<
private transient volatile Exception flushException;
private transient RuntimeContext runtimeContext;
- private MetricData metricData;
+ private SinkMetricData sinkMetricData;
+ private String inLongGroupId;
+ private String inLongStreamId;
+ private transient AuditImp auditImp;
private Long dataSize = 0L;
private Long rowSize = 0L;
@@ -83,12 +98,14 @@ public class JdbcBatchingOutputFormat<
@Nonnull JdbcExecutionOptions executionOptions,
@Nonnull StatementExecutorFactory<JdbcExec> statementExecutorFactory,
@Nonnull RecordExtractor<In, JdbcIn> recordExtractor,
- String inLongMetric) {
+ String inLongMetric,
+ String auditHostAndPorts) {
super(connectionProvider);
this.executionOptions = checkNotNull(executionOptions);
this.statementExecutorFactory = checkNotNull(statementExecutorFactory);
this.jdbcRecordExtractor = checkNotNull(recordExtractor);
this.inLongMetric = inLongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
}
public static Builder builder() {
@@ -120,19 +137,28 @@ public class JdbcBatchingOutputFormat<
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
this.runtimeContext = getRuntimeContext();
- metricData = new MetricData(runtimeContext.getMetricGroup());
+ sinkMetricData = new SinkMetricData(runtimeContext.getMetricGroup());
if (inLongMetric != null && !inLongMetric.isEmpty()) {
- String[] inLongMetricArray = inLongMetric.split("&");
- String groupId = inLongMetricArray[0];
- String streamId = inLongMetricArray[1];
+ String[] inLongMetricArray = inLongMetric.split(DELIMITER);
+ inLongGroupId = inLongMetricArray[0];
+ inLongStreamId = inLongMetricArray[1];
String nodeId = inLongMetricArray[2];
- metricData.registerMetricsForDirtyBytes(groupId, streamId, nodeId, "dirtyBytes");
- metricData.registerMetricsForDirtyRecords(groupId, streamId, nodeId, "dirtyRecords");
- metricData.registerMetricsForNumBytesOut(groupId, streamId, nodeId, "numBytesOut");
- metricData.registerMetricsForNumRecordsOut(groupId, streamId, nodeId, "numRecordsOut");
- metricData.registerMetricsForNumBytesOutPerSecond(groupId, streamId, nodeId, "numBytesOutPerSecond");
- metricData.registerMetricsForNumRecordsOutPerSecond(groupId, streamId, nodeId,
- "numRecordsOutPerSecond");
+ sinkMetricData.registerMetricsForDirtyBytes(inLongGroupId, inLongStreamId,
+ nodeId, DIRTY_BYTES);
+ sinkMetricData.registerMetricsForDirtyRecords(inLongGroupId, inLongStreamId,
+ nodeId, DIRTY_RECORDS);
+ sinkMetricData.registerMetricsForNumBytesOut(inLongGroupId, inLongStreamId,
+ nodeId, NUM_BYTES_OUT);
+ sinkMetricData.registerMetricsForNumRecordsOut(inLongGroupId, inLongStreamId,
+ nodeId, NUM_RECORDS_OUT);
+ sinkMetricData.registerMetricsForNumBytesOutPerSecond(inLongGroupId, inLongStreamId, nodeId,
+ NUM_BYTES_OUT_PER_SECOND);
+ sinkMetricData.registerMetricsForNumRecordsOutPerSecond(inLongGroupId, inLongStreamId, nodeId,
+ NUM_RECORDS_OUT_PER_SECOND);
+ }
+ if (auditHostAndPorts != null) {
+ AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
+ auditImp = AuditImp.getInstance();
}
jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory);
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
@@ -146,20 +172,20 @@ public class JdbcBatchingOutputFormat<
if (!closed) {
try {
flush();
- if (metricData.getNumRecordsOut() != null) {
- metricData.getNumRecordsOut().inc(rowSize);
+ if (sinkMetricData.getNumRecordsOut() != null) {
+ sinkMetricData.getNumRecordsOut().inc(rowSize);
}
- if (metricData.getNumBytesOut() != null) {
- metricData.getNumBytesOut()
+ if (sinkMetricData.getNumBytesOut() != null) {
+ sinkMetricData.getNumBytesOut()
.inc(dataSize);
}
resetStateAfterFlush();
} catch (Exception e) {
- if (metricData.getDirtyRecords() != null) {
- metricData.getDirtyRecords().inc(rowSize);
+ if (sinkMetricData.getDirtyRecords() != null) {
+ sinkMetricData.getDirtyRecords().inc(rowSize);
}
- if (metricData.getDirtyBytes() != null) {
- metricData.getDirtyBytes().inc(dataSize);
+ if (sinkMetricData.getDirtyBytes() != null) {
+ sinkMetricData.getDirtyBytes().inc(dataSize);
}
resetStateAfterFlush();
flushException = e;
@@ -190,33 +216,46 @@ public class JdbcBatchingOutputFormat<
}
}
+ private void outputMetricForAudit(long length) {
+ if (auditImp != null) {
+ auditImp.add(
+ AUDIT_SORT_INPUT,
+ inLongGroupId,
+ inLongStreamId,
+ System.currentTimeMillis(),
+ 1,
+ length);
+ }
+ }
+
@Override
public final synchronized void writeRecord(In record) throws IOException {
checkFlushException();
rowSize++;
dataSize = dataSize + record.toString().getBytes(StandardCharsets.UTF_8).length;
+ outputMetricForAudit(dataSize);
try {
addToBatch(record, jdbcRecordExtractor.apply(record));
batchCount++;
if (executionOptions.getBatchSize() > 0
&& batchCount >= executionOptions.getBatchSize()) {
flush();
- if (metricData.getNumRecordsOut() != null) {
- metricData.getNumRecordsOut().inc(rowSize);
+ if (sinkMetricData.getNumRecordsOut() != null) {
+ sinkMetricData.getNumRecordsOut().inc(rowSize);
}
- if (metricData.getNumBytesOut() != null) {
- metricData.getNumBytesOut()
+ if (sinkMetricData.getNumBytesOut() != null) {
+ sinkMetricData.getNumBytesOut()
.inc(dataSize);
}
resetStateAfterFlush();
}
} catch (Exception e) {
- if (metricData.getDirtyRecords() != null) {
- metricData.getDirtyRecords().inc(rowSize);
+ if (sinkMetricData.getDirtyRecords() != null) {
+ sinkMetricData.getDirtyRecords().inc(rowSize);
}
- if (metricData.getDirtyBytes() != null) {
- metricData.getDirtyBytes().inc(dataSize);
+ if (sinkMetricData.getDirtyBytes() != null) {
+ sinkMetricData.getDirtyBytes().inc(dataSize);
}
resetStateAfterFlush();
throw new IOException("Writing records to JDBC failed.", e);
@@ -346,6 +385,7 @@ public class JdbcBatchingOutputFormat<
private String[] keyFields;
private int[] fieldTypes;
private String inLongMetric;
+ private String auditHostAndPorts;
private JdbcExecutionOptions.Builder executionOptionsBuilder =
JdbcExecutionOptions.builder();
@@ -389,6 +429,14 @@ public class JdbcBatchingOutputFormat<
return this;
}
+ /**
+ * auditHostAndPorts
+ */
+ public Builder setAuditHostAndPorts(String auditHostAndPorts) {
+ this.auditHostAndPorts = auditHostAndPorts;
+ return this;
+ }
+
/**
* optional, flush max size (includes all append, upsert and delete records), over this
* number of records, will flush data.
@@ -436,7 +484,8 @@ public class JdbcBatchingOutputFormat<
new SimpleJdbcConnectionProvider(options),
dml,
executionOptionsBuilder.build(),
- inLongMetric);
+ inLongMetric,
+ auditHostAndPorts);
} else {
// warn: don't close over builder fields
String sql =
@@ -457,7 +506,8 @@ public class JdbcBatchingOutputFormat<
Preconditions.checkArgument(tuple2.f0);
return tuple2.f1;
},
- inLongMetric);
+ inLongMetric,
+ auditHostAndPorts);
}
}
}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
index 3d293ada1..4ef1ff2e3 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
@@ -58,13 +58,15 @@ class TableJdbcUpsertOutputFormat
JdbcConnectionProvider connectionProvider,
JdbcDmlOptions dmlOptions,
JdbcExecutionOptions batchOptions,
- String inLongMetric) {
+ String inLongMetric,
+ String auditHostAndPorts) {
this(
connectionProvider,
batchOptions,
ctx -> createUpsertRowExecutor(dmlOptions, ctx),
ctx -> createDeleteExecutor(dmlOptions, ctx),
- inLongMetric);
+ inLongMetric,
+ auditHostAndPorts);
}
@VisibleForTesting
@@ -74,9 +76,11 @@ class TableJdbcUpsertOutputFormat
StatementExecutorFactory<JdbcBatchStatementExecutor<Row>> statementExecutorFactory,
StatementExecutorFactory<JdbcBatchStatementExecutor<Row>>
deleteStatementExecutorFactory,
- String inLongMetric
+ String inLongMetric,
+ String auditHostAndPorts
) {
- super(connectionProvider, batchOptions, statementExecutorFactory, tuple2 -> tuple2.f1, inLongMetric);
+ super(connectionProvider, batchOptions, statementExecutorFactory, tuple2 -> tuple2.f1,
+ inLongMetric, auditHostAndPorts);
this.deleteStatementExecutorFactory = deleteStatementExecutorFactory;
}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/metric/MetricData.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/metric/MetricData.java
deleted file mode 100644
index af136efcf..000000000
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/metric/MetricData.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.jdbc.metric;
-
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.MeterView;
-import org.apache.flink.metrics.MetricGroup;
-
-/**
- * A collection class for handling metrics
- */
-public class MetricData {
-
- private static Integer TIME_SPAN_IN_SECONDS = 60;
- private static String STREAM_ID = "streamId";
- private static String GROUP_ID = "groupId";
- private static String NODE_ID = "nodeId";
- private final MetricGroup metricGroup;
- private Counter numRecordsOut;
- private Counter numBytesOut;
- private Counter dirtyRecords;
- private Counter dirtyBytes;
- private Meter numRecordsOutPerSecond;
- private Meter numBytesOutPerSecond;
-
- public MetricData(MetricGroup metricGroup) {
- this.metricGroup = metricGroup;
- }
-
- public void registerMetricsForNumRecordsOut(String groupId, String streamId, String nodeId, String metricName) {
- numRecordsOut =
- metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
- .counter(metricName);
- }
-
- public void registerMetricsForNumBytesOut(String groupId, String streamId, String nodeId, String metricName) {
- numBytesOut =
- metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
- .counter(metricName);
- }
-
- public void registerMetricsForNumRecordsOutPerSecond(String groupId, String streamId, String nodeId,
- String metricName) {
- numRecordsOutPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID,
- nodeId)
- .meter(metricName, new MeterView(this.numRecordsOut, TIME_SPAN_IN_SECONDS));
- }
-
- public void registerMetricsForNumBytesOutPerSecond(String groupId, String streamId, String nodeId,
- String metricName) {
- numBytesOutPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId)
- .addGroup(NODE_ID, nodeId)
- .meter(metricName, new MeterView(this.numBytesOut, TIME_SPAN_IN_SECONDS));
- }
-
- public void registerMetricsForDirtyRecords(String groupId, String streamId, String nodeId,
- String metricName) {
- dirtyRecords = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
- .counter(metricName);
- }
-
- public void registerMetricsForDirtyBytes(String groupId, String streamId, String nodeId,
- String metricName) {
- dirtyBytes =
- metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
- .counter(metricName);
-
- }
-
- public Counter getNumRecordsOut() {
- return numRecordsOut;
- }
-
- public Counter getNumBytesOut() {
- return numBytesOut;
- }
-
- public Counter getDirtyRecords() {
- return dirtyRecords;
- }
-
- public Counter getDirtyBytes() {
- return dirtyBytes;
- }
-
- public Meter getNumRecordsOutPerSecond() {
- return numRecordsOutPerSecond;
- }
-
- public Meter getNumBytesOutPerSecond() {
- return numBytesOutPerSecond;
- }
-
-}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
index 33991d220..1e0369e2c 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
@@ -65,6 +65,7 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable {
private TypeInformation<RowData> rowDataTypeInformation;
private DataType[] fieldDataTypes;
private String inLongMetric;
+ private String auditHostAndPorts;
public JdbcDynamicOutputFormatBuilder() {
@@ -240,6 +241,11 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable {
return this;
}
+ public JdbcDynamicOutputFormatBuilder setAuditHostAndPorts(String auditHostAndPorts) {
+ this.auditHostAndPorts = auditHostAndPorts;
+ return this;
+ }
+
public JdbcBatchingOutputFormat<RowData, ?, ?> build() {
checkNotNull(jdbcOptions, "jdbc options can not be null");
checkNotNull(dmlOptions, "jdbc dml options can not be null");
@@ -258,7 +264,8 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable {
createBufferReduceExecutor(
dmlOptions, ctx, rowDataTypeInformation, logicalTypes),
JdbcBatchingOutputFormat.RecordExtractor.identity(),
- inLongMetric);
+ inLongMetric,
+ auditHostAndPorts);
} else {
// append only query
final String sql =
@@ -278,7 +285,8 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable {
sql,
rowDataTypeInformation),
JdbcBatchingOutputFormat.RecordExtractor.identity(),
- inLongMetric);
+ inLongMetric,
+ auditHostAndPorts);
}
}
}
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
index f60c3f8c8..328f86a45 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
@@ -44,6 +44,8 @@ import java.util.Optional;
import java.util.Set;
import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
/**
* Copy from org.apache.flink:flink-connector-jdbc_2.11:1.13.5
@@ -174,12 +176,6 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
.defaultValue(false)
.withDescription("Whether to support sink update/delete data without primaryKey.");
- public static final ConfigOption<String> INLONG_METRIC =
- ConfigOptions.key("inlong.metric")
- .stringType()
- .defaultValue("")
- .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
-
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
final FactoryUtil.TableFactoryHelper helper =
@@ -192,14 +188,16 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
boolean appendMode = config.get(SINK_APPEND_MODE);
- String inLongMetric = config.get(INLONG_METRIC);
+ String inLongMetric = config.getOptional(INLONG_METRIC).orElse(null);
+ String auditHostAndPorts = config.getOptional(INLONG_AUDIT).orElse(null);
return new JdbcDynamicTableSink(
jdbcOptions,
getJdbcExecutionOptions(config),
getJdbcDmlOptions(jdbcOptions, physicalSchema),
physicalSchema,
appendMode,
- inLongMetric);
+ inLongMetric,
+ auditHostAndPorts);
}
@Override
@@ -326,6 +324,7 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
optionalOptions.add(MAX_RETRY_TIMEOUT);
optionalOptions.add(DIALECT_IMPL);
optionalOptions.add(INLONG_METRIC);
+ optionalOptions.add(INLONG_AUDIT);
return optionalOptions;
}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
index 69998c1be..92e54e816 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
@@ -51,6 +51,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
private final String dialectName;
private final String inLongMetric;
+ private final String auditHostAndPorts;
private final boolean appendMode;
public JdbcDynamicTableSink(
@@ -59,7 +60,8 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
JdbcDmlOptions dmlOptions,
TableSchema tableSchema,
boolean appendMode,
- String inLongMetric) {
+ String inLongMetric,
+ String auditHostAndPorts) {
this.jdbcOptions = jdbcOptions;
this.executionOptions = executionOptions;
this.dmlOptions = dmlOptions;
@@ -67,6 +69,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
this.dialectName = dmlOptions.getDialect().dialectName();
this.appendMode = appendMode;
this.inLongMetric = inLongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
}
@Override
@@ -99,6 +102,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
builder.setRowDataTypeInfo(rowDataTypeInformation);
builder.setFieldDataTypes(tableSchema.getFieldDataTypes());
builder.setInLongMetric(inLongMetric);
+ builder.setAuditHostAndPorts(auditHostAndPorts);
return SinkFunctionProvider.of(
new GenericJdbcSinkFunction<>(builder.build()), jdbcOptions.getParallelism());
}
@@ -106,7 +110,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
@Override
public DynamicTableSink copy() {
return new JdbcDynamicTableSink(jdbcOptions, executionOptions, dmlOptions,
- tableSchema, appendMode, inLongMetric);
+ tableSchema, appendMode, inLongMetric, auditHostAndPorts);
}
@Override
@@ -127,11 +131,14 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
&& Objects.equals(executionOptions, that.executionOptions)
&& Objects.equals(dmlOptions, that.dmlOptions)
&& Objects.equals(tableSchema, that.tableSchema)
- && Objects.equals(dialectName, that.dialectName);
+ && Objects.equals(dialectName, that.dialectName)
+ && Objects.equals(inLongMetric, that.inLongMetric)
+ && Objects.equals(auditHostAndPorts, that.auditHostAndPorts);
}
@Override
public int hashCode() {
- return Objects.hash(jdbcOptions, executionOptions, dmlOptions, tableSchema, dialectName);
+ return Objects.hash(jdbcOptions, executionOptions, dmlOptions, tableSchema, dialectName,
+ inLongMetric, auditHostAndPorts);
}
}
\ No newline at end of file