You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/09/08 07:59:22 UTC
[inlong] 01/02: [INLONG-5608][Sort] Reformat connector codes for reporting metrics (#5612)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit bbffb2e840b208b944ab3874e65842a49ed264aa
Author: Oneal65 <li...@foxmail.com>
AuthorDate: Fri Sep 2 20:31:15 2022 +0800
[INLONG-5608][Sort] Reformat connector codes for reporting metrics (#5612)
---
.../sort/elasticsearch6/ElasticsearchSink.java | 18 +++++-----
.../table/Elasticsearch6DynamicSink.java | 18 +++++-----
.../table/Elasticsearch6DynamicSinkFactory.java | 4 +--
.../sort/elasticsearch7/ElasticsearchSink.java | 18 +++++-----
.../table/Elasticsearch7DynamicSink.java | 18 +++++-----
.../table/Elasticsearch7DynamicSinkFactory.java | 4 +--
.../sort/elasticsearch/ElasticsearchSinkBase.java | 17 +++++-----
.../table/RowElasticsearchSinkFunction.java | 20 ++++++------
.../sort/hbase/HBase2DynamicTableFactory.java | 2 +-
.../org/apache/inlong/sort/hive/HiveTableSink.java | 12 +++----
.../hive/filesystem/AbstractStreamingWriter.java | 18 +++++-----
.../sort/hive/filesystem/CompactFileWriter.java | 4 +--
.../sort/hive/filesystem/StreamingFileWriter.java | 4 +--
.../inlong/sort/hive/filesystem/StreamingSink.java | 8 ++---
.../sort/hive/table/HiveTableInlongFactory.java | 4 +--
.../apache/inlong/sort/iceberg/sink/FlinkSink.java | 14 ++++----
.../sort/iceberg/sink/IcebergStreamWriter.java | 18 +++++-----
.../jdbc/internal/JdbcBatchingOutputFormat.java | 38 +++++++++++-----------
.../jdbc/internal/TableJdbcUpsertOutputFormat.java | 8 ++---
.../jdbc/table/JdbcDynamicOutputFormatBuilder.java | 10 +++---
.../sort/jdbc/table/JdbcDynamicTableFactory.java | 4 +--
.../sort/jdbc/table/JdbcDynamicTableSink.java | 14 ++++----
.../inlong/sort/kafka/FlinkKafkaProducer.java | 30 ++++++++---------
.../apache/inlong/sort/kafka/KafkaDynamicSink.java | 14 ++++----
.../table/DynamicKafkaDeserializationSchema.java | 22 ++++++-------
.../sort/kafka/table/KafkaDynamicSource.java | 10 +++---
.../sort/kafka/table/KafkaDynamicTableFactory.java | 16 ++++-----
.../table/UpsertKafkaDynamicTableFactory.java | 4 +--
.../sort/cdc/mongodb/DebeziumSourceFunction.java | 3 +-
.../mongodb/table/MongoDBTableSourceFactory.java | 2 +-
.../sort/cdc/debezium/DebeziumSourceFunction.java | 3 +-
.../mysql/table/MySqlTableInlongSourceFactory.java | 2 +-
.../sort/cdc/oracle/DebeziumSourceFunction.java | 3 +-
.../cdc/oracle/table/OracleTableSourceFactory.java | 2 +-
.../DebeziumSourceFunction.java | 3 +-
.../cdc/postgres/table/PostgreSQLTableFactory.java | 2 +-
.../pulsar/table/PulsarDynamicTableFactory.java | 8 +++--
.../table/UpsertPulsarDynamicTableFactory.java | 2 +-
.../sqlserver/table/DebeziumSourceFunction.java | 16 ++++-----
.../cdc/sqlserver/table/SqlServerTableFactory.java | 2 +-
40 files changed, 209 insertions(+), 210 deletions(-)
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
index c30787d7c..32848dfa6 100644
--- a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
@@ -70,14 +70,14 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
ActionRequestFailureHandler failureHandler,
RestClientFactory restClientFactory,
- String inLongMetric) {
+ String inlongMetric) {
super(
new Elasticsearch6ApiCallBridge(httpHosts, restClientFactory),
bulkRequestsConfig,
elasticsearchSinkFunction,
failureHandler,
- inLongMetric);
+ inlongMetric);
}
/**
@@ -95,7 +95,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
private ActionRequestFailureHandler failureHandler = new NoOpFailureHandler();
private RestClientFactory restClientFactory = restClientBuilder -> {
};
- private String inLongMetric = null;
+ private String inlongMetric = null;
/**
* Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link
@@ -114,10 +114,10 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
/**
* set InLongMetric for reporting metrics
- * @param inLongMetric
+ * @param inlongMetric
*/
- public void setInLongMetric(String inLongMetric) {
- this.inLongMetric = inLongMetric;
+ public void setInLongMetric(String inlongMetric) {
+ this.inlongMetric = inlongMetric;
}
/**
@@ -244,7 +244,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
elasticsearchSinkFunction,
failureHandler,
restClientFactory,
- inLongMetric
+ inlongMetric
);
}
@@ -262,7 +262,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
&& Objects.equals(bulkRequestsConfig, builder.bulkRequestsConfig)
&& Objects.equals(failureHandler, builder.failureHandler)
&& Objects.equals(restClientFactory, builder.restClientFactory)
- && Objects.equals(inLongMetric, builder.inLongMetric);
+ && Objects.equals(inlongMetric, builder.inlongMetric);
}
@Override
@@ -273,7 +273,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
bulkRequestsConfig,
failureHandler,
restClientFactory,
- inLongMetric);
+ inlongMetric);
}
}
}
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
index 2550edeb4..bff1ad476 100644
--- a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
@@ -64,7 +64,7 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
private final EncodingFormat<SerializationSchema<RowData>> format;
private final TableSchema schema;
private final Elasticsearch6Configuration config;
- private final String inLongMetric;
+ private final String inlongMetric;
private final String auditHostAndPorts;
private final ElasticSearchBuilderProvider builderProvider;
@@ -82,9 +82,9 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
EncodingFormat<SerializationSchema<RowData>> format,
Elasticsearch6Configuration config,
TableSchema schema,
- String inLongMetric,
+ String inlongMetric,
String auditHostAndPorts) {
- this(format, config, schema, (ElasticsearchSink.Builder::new), inLongMetric, auditHostAndPorts);
+ this(format, config, schema, (ElasticsearchSink.Builder::new), inlongMetric, auditHostAndPorts);
}
Elasticsearch6DynamicSink(
@@ -92,13 +92,13 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
Elasticsearch6Configuration config,
TableSchema schema,
ElasticSearchBuilderProvider builderProvider,
- String inLongMetric,
+ String inlongMetric,
String auditHostAndPorts) {
this.format = format;
this.schema = schema;
this.config = config;
this.builderProvider = builderProvider;
- this.inLongMetric = inLongMetric;
+ this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
}
@@ -133,7 +133,7 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()),
RoutingExtractor.createRoutingExtractor(
schema, config.getRoutingField().orElse(null)),
- inLongMetric,
+ inlongMetric,
auditHostAndPorts);
final ElasticsearchSink.Builder<RowData> builder =
@@ -144,7 +144,7 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
builder.setBulkFlushInterval(config.getBulkFlushInterval());
builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
- builder.setInLongMetric(inLongMetric);
+ builder.setInLongMetric(inlongMetric);
config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
@@ -198,12 +198,12 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
&& Objects.equals(schema, that.schema)
&& Objects.equals(config, that.config)
&& Objects.equals(builderProvider, that.builderProvider)
- && Objects.equals(inLongMetric, that.inLongMetric);
+ && Objects.equals(inlongMetric, that.inlongMetric);
}
@Override
public int hashCode() {
- return Objects.hash(format, schema, config, builderProvider, inLongMetric);
+ return Objects.hash(format, schema, config, builderProvider, inlongMetric);
}
@FunctionalInterface
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
index ce1581ddd..df5937b64 100644
--- a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
@@ -107,12 +107,12 @@ public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory
validate(config, configuration);
- String inLongMetric = helper.getOptions().get(INLONG_METRIC);
+ String inlongMetric = helper.getOptions().getOptional(INLONG_METRIC).orElse(null);
String auditHostAndPorts = helper.getOptions().getOptional(INLONG_AUDIT).orElse(null);
return new Elasticsearch6DynamicSink(
- format, config, TableSchemaUtils.getPhysicalSchema(tableSchema), inLongMetric, auditHostAndPorts);
+ format, config, TableSchemaUtils.getPhysicalSchema(tableSchema), inlongMetric, auditHostAndPorts);
}
private void validate(Elasticsearch6Configuration config, Configuration originalConfiguration) {
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
index eab2dd119..10b7cdfb8 100644
--- a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
@@ -71,14 +71,14 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
ActionRequestFailureHandler failureHandler,
RestClientFactory restClientFactory,
- String inLongMetric) {
+ String inlongMetric) {
super(
new Elasticsearch7ApiCallBridge(httpHosts, restClientFactory),
bulkRequestsConfig,
elasticsearchSinkFunction,
failureHandler,
- inLongMetric);
+ inlongMetric);
}
/**
@@ -96,7 +96,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
private ActionRequestFailureHandler failureHandler = new NoOpFailureHandler();
private RestClientFactory restClientFactory = restClientBuilder -> {
};
- private String inLongMetric = null;
+ private String inlongMetric = null;
/**
* Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link
@@ -115,10 +115,10 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
/**
* set InLongMetric for reporting metrics
- * @param inLongMetric
+ * @param inlongMetric
*/
- public void setInLongMetric(String inLongMetric) {
- this.inLongMetric = inLongMetric;
+ public void setInLongMetric(String inlongMetric) {
+ this.inlongMetric = inlongMetric;
}
/**
@@ -245,7 +245,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
elasticsearchSinkFunction,
failureHandler,
restClientFactory,
- inLongMetric);
+ inlongMetric);
}
@Override
@@ -262,7 +262,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
&& Objects.equals(bulkRequestsConfig, builder.bulkRequestsConfig)
&& Objects.equals(failureHandler, builder.failureHandler)
&& Objects.equals(restClientFactory, builder.restClientFactory)
- && Objects.equals(inLongMetric, builder.inLongMetric);
+ && Objects.equals(inlongMetric, builder.inlongMetric);
}
@Override
@@ -273,7 +273,7 @@ public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevel
bulkRequestsConfig,
failureHandler,
restClientFactory,
- inLongMetric);
+ inlongMetric);
}
}
}
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
index 202990336..392e35dbf 100644
--- a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
@@ -65,7 +65,7 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
private final EncodingFormat<SerializationSchema<RowData>> format;
private final TableSchema schema;
private final Elasticsearch7Configuration config;
- private final String inLongMetric;
+ private final String inlongMetric;
private final String auditHostAndPorts;
private final ElasticSearchBuilderProvider builderProvider;
@@ -83,9 +83,9 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
EncodingFormat<SerializationSchema<RowData>> format,
Elasticsearch7Configuration config,
TableSchema schema,
- String inLongMetric,
+ String inlongMetric,
String auditHostAndPorts) {
- this(format, config, schema, (ElasticsearchSink.Builder::new), inLongMetric, auditHostAndPorts);
+ this(format, config, schema, (ElasticsearchSink.Builder::new), inlongMetric, auditHostAndPorts);
}
Elasticsearch7DynamicSink(
@@ -93,13 +93,13 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
Elasticsearch7Configuration config,
TableSchema schema,
ElasticSearchBuilderProvider builderProvider,
- String inLongMetric,
+ String inlongMetric,
String auditHostAndPorts) {
this.format = format;
this.schema = schema;
this.config = config;
this.builderProvider = builderProvider;
- this.inLongMetric = inLongMetric;
+ this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
}
@@ -134,7 +134,7 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()),
RoutingExtractor.createRoutingExtractor(
schema, config.getRoutingField().orElse(null)),
- inLongMetric,
+ inlongMetric,
auditHostAndPorts);
final ElasticsearchSink.Builder<RowData> builder =
@@ -145,7 +145,7 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
builder.setBulkFlushInterval(config.getBulkFlushInterval());
builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
- builder.setInLongMetric(inLongMetric);
+ builder.setInLongMetric(inlongMetric);
config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
@@ -199,13 +199,13 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
&& Objects.equals(schema, that.schema)
&& Objects.equals(config, that.config)
&& Objects.equals(builderProvider, that.builderProvider)
- && Objects.equals(inLongMetric, that.inLongMetric)
+ && Objects.equals(inlongMetric, that.inlongMetric)
&& Objects.equals(auditHostAndPorts, that.auditHostAndPorts);
}
@Override
public int hashCode() {
- return Objects.hash(format, schema, config, builderProvider, inLongMetric, auditHostAndPorts);
+ return Objects.hash(format, schema, config, builderProvider, inlongMetric, auditHostAndPorts);
}
@FunctionalInterface
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java
index 0f27db9f1..d29c646bd 100644
--- a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java
@@ -107,12 +107,12 @@ public class Elasticsearch7DynamicSinkFactory implements DynamicTableSinkFactory
validate(config, configuration);
- String inLongMetric = helper.getOptions().getOptional(INLONG_METRIC).orElse(null);
+ String inlongMetric = helper.getOptions().getOptional(INLONG_METRIC).orElse(null);
String auditHostAndPorts = helper.getOptions().getOptional(INLONG_AUDIT).orElse(null);
return new Elasticsearch7DynamicSink(
- format, config, TableSchemaUtils.getPhysicalSchema(tableSchema), inLongMetric, auditHostAndPorts);
+ format, config, TableSchemaUtils.getPhysicalSchema(tableSchema), inlongMetric, auditHostAndPorts);
}
private void validate(Elasticsearch7Configuration config, Configuration originalConfiguration) {
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index da5b5f9ef..000e1c23a 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
/**
* Base class for all Flink Elasticsearch Sinks.
@@ -121,7 +122,7 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
* sink is closed.
*/
private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
- private final String inLongMetric;
+ private final String inlongMetric;
/**
* If true, the producer will wait until all outstanding action requests have been sent to
* Elasticsearch.
@@ -167,8 +168,8 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
Map<String, String> userConfig,
ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
ActionRequestFailureHandler failureHandler,
- String inLongMetric) {
- this.inLongMetric = inLongMetric;
+ String inlongMetric) {
+ this.inlongMetric = inlongMetric;
this.callBridge = checkNotNull(callBridge);
this.elasticsearchSinkFunction = checkNotNull(elasticsearchSinkFunction);
this.failureHandler = checkNotNull(failureHandler);
@@ -265,11 +266,11 @@ public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends
@Override
public void open(Configuration parameters) throws Exception {
client = callBridge.createClient(userConfig);
- if (inLongMetric != null && !inLongMetric.isEmpty()) {
- String[] inLongMetricArray = inLongMetric.split("&");
- String groupId = inLongMetricArray[0];
- String streamId = inLongMetricArray[1];
- String nodeId = inLongMetricArray[2];
+ if (inlongMetric != null && !inlongMetric.isEmpty()) {
+ String[] inlongMetricArray = inlongMetric.split(DELIMITER);
+ String groupId = inlongMetricArray[0];
+ String streamId = inlongMetricArray[1];
+ String nodeId = inlongMetricArray[2];
sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, getRuntimeContext().getMetricGroup());
sinkMetricData.registerMetricsForDirtyBytes();
sinkMetricData.registerMetricsForDirtyRecords();
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index 80abeef54..0ae93231d 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -55,7 +55,7 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
private final XContentType contentType;
private final RequestFactory requestFactory;
private final Function<RowData, String> createKey;
- private final String inLongMetric;
+ private final String inlongMetric;
private final String auditHostAndPorts;
private final Function<RowData, String> createRouting;
@@ -77,7 +77,7 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
RequestFactory requestFactory,
Function<RowData, String> createKey,
@Nullable Function<RowData, String> createRouting,
- String inLongMetric,
+ String inlongMetric,
String auditHostAndPorts) {
this.indexGenerator = Preconditions.checkNotNull(indexGenerator);
this.docType = docType;
@@ -86,7 +86,7 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
this.requestFactory = Preconditions.checkNotNull(requestFactory);
this.createKey = Preconditions.checkNotNull(createKey);
this.createRouting = createRouting;
- this.inLongMetric = inLongMetric;
+ this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
}
@@ -94,11 +94,11 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
public void open(RuntimeContext ctx) {
indexGenerator.open();
this.runtimeContext = ctx;
- if (inLongMetric != null && !inLongMetric.isEmpty()) {
- String[] inLongMetricArray = inLongMetric.split("&");
- groupId = inLongMetricArray[0];
- streamId = inLongMetricArray[1];
- String nodeId = inLongMetricArray[2];
+ if (inlongMetric != null && !inlongMetric.isEmpty()) {
+ String[] inlongMetricArray = inlongMetric.split(DELIMITER);
+ groupId = inlongMetricArray[0];
+ streamId = inlongMetricArray[1];
+ String nodeId = inlongMetricArray[2];
sinkMetricData = new SinkMetricData(groupId, streamId, nodeId, runtimeContext.getMetricGroup());
sinkMetricData.registerMetricsForNumBytesOut();
sinkMetricData.registerMetricsForNumRecordsOut();
@@ -202,7 +202,7 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
&& contentType == that.contentType
&& Objects.equals(requestFactory, that.requestFactory)
&& Objects.equals(createKey, that.createKey)
- && Objects.equals(inLongMetric, that.inLongMetric);
+ && Objects.equals(inlongMetric, that.inlongMetric);
}
@Override
@@ -214,6 +214,6 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
contentType,
requestFactory,
createKey,
- inLongMetric);
+ inlongMetric);
}
}
diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
index 63e3f87a4..da14c947a 100644
--- a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/HBase2DynamicTableFactory.java
@@ -104,7 +104,7 @@ public class HBase2DynamicTableFactory
HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
- String inlongMetric = tableOptions.get(INLONG_METRIC);
+ String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
String inlongAudit = tableOptions.get(INLONG_AUDIT);
ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java
index 821cfe72c..7d0048cea 100644
--- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java
+++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java
@@ -118,7 +118,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su
private LinkedHashMap<String, String> staticPartitionSpec = new LinkedHashMap<>();
private boolean overwrite = false;
private boolean dynamicGrouping = false;
- private String inLongMetric;
+ private String inlongMetric;
private String auditHostAndPorts;
public HiveTableSink(
@@ -127,7 +127,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su
ObjectIdentifier identifier,
CatalogTable table,
@Nullable Integer configuredParallelism,
- final String inLongMetric,
+ final String inlongMetric,
final String auditHostAndPorts) {
this.flinkConf = flinkConf;
this.jobConf = jobConf;
@@ -140,7 +140,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su
hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
tableSchema = TableSchemaUtils.getPhysicalSchema(table.getSchema());
this.configuredParallelism = configuredParallelism;
- this.inLongMetric = inLongMetric;
+ this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
}
@@ -339,7 +339,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su
createCompactReaderFactory(sd, tableProps),
compactionSize,
parallelism,
- inLongMetric,
+ inlongMetric,
auditHostAndPorts);
} else {
writerStream =
@@ -348,7 +348,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su
bucketCheckInterval,
builder,
parallelism,
- inLongMetric,
+ inlongMetric,
auditHostAndPorts);
}
@@ -492,7 +492,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su
identifier,
catalogTable,
configuredParallelism,
- inLongMetric,
+ inlongMetric,
auditHostAndPorts);
sink.staticPartitionSpec = staticPartitionSpec;
sink.overwrite = overwrite;
diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
index 16a7bd528..5d0ca7bf0 100644
--- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
+++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
@@ -56,7 +56,7 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
bucketsBuilder;
@Nullable
- private String inLongMetric;
+ private String inlongMetric;
@Nullable
private String auditHostAndPorts;
@@ -77,11 +77,11 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
StreamingFileSink.BucketsBuilder<
IN, String, ? extends StreamingFileSink.BucketsBuilder<IN, String, ?>>
bucketsBuilder,
- String inLongMetric,
+ String inlongMetric,
String auditHostAndPorts) {
this.bucketCheckInterval = bucketCheckInterval;
this.bucketsBuilder = bucketsBuilder;
- this.inLongMetric = inLongMetric;
+ this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
setChainingStrategy(ChainingStrategy.ALWAYS);
}
@@ -111,13 +111,13 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe
@Override
public void open() throws Exception {
super.open();
- if (inLongMetric != null) {
- String[] inLongMetricArray = inLongMetric.split(DELIMITER);
- String inLongGroupId = inLongMetricArray[0];
- String inLongStreamId = inLongMetricArray[1];
- String nodeId = inLongMetricArray[2];
+ if (inlongMetric != null) {
+ String[] inlongMetricArray = inlongMetric.split(DELIMITER);
+ String inlongGroupId = inlongMetricArray[0];
+ String inlongStreamId = inlongMetricArray[1];
+ String nodeId = inlongMetricArray[2];
metricData = new SinkMetricData(
- inLongGroupId, inLongStreamId, nodeId, getRuntimeContext().getMetricGroup(), auditHostAndPorts);
+ inlongGroupId, inlongStreamId, nodeId, getRuntimeContext().getMetricGroup(), auditHostAndPorts);
metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/CompactFileWriter.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/CompactFileWriter.java
index 22e4b3a5d..2c368ba19 100644
--- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/CompactFileWriter.java
+++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/CompactFileWriter.java
@@ -36,9 +36,9 @@ public class CompactFileWriter<T>
StreamingFileSink.BucketsBuilder<
T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>>
bucketsBuilder,
- String inLongMetric,
+ String inlongMetric,
String auditHostAndPorts) {
- super(bucketCheckInterval, bucketsBuilder, inLongMetric, auditHostAndPorts);
+ super(bucketCheckInterval, bucketsBuilder, inlongMetric, auditHostAndPorts);
}
@Override
diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingFileWriter.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingFileWriter.java
index 43703598a..b0c8aee26 100644
--- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingFileWriter.java
+++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingFileWriter.java
@@ -45,9 +45,9 @@ public class StreamingFileWriter<IN> extends AbstractStreamingWriter<IN, Partiti
StreamingFileSink.BucketsBuilder<
IN, String, ? extends StreamingFileSink.BucketsBuilder<IN, String, ?>>
bucketsBuilder,
- String inLongMetric,
+ String inlongMetric,
String auditHostAndPorts) {
- super(bucketCheckInterval, bucketsBuilder, inLongMetric, auditHostAndPorts);
+ super(bucketCheckInterval, bucketsBuilder, inlongMetric, auditHostAndPorts);
}
@Override
diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingSink.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingSink.java
index 31bd6c1b6..1c7663064 100644
--- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingSink.java
+++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/StreamingSink.java
@@ -65,10 +65,10 @@ public class StreamingSink {
T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>>
bucketsBuilder,
int parallelism,
- String inLongMetric,
+ String inlongMetric,
String auditHostAndPorts) {
StreamingFileWriter<T> fileWriter =
- new StreamingFileWriter<>(bucketCheckInterval, bucketsBuilder, inLongMetric, auditHostAndPorts);
+ new StreamingFileWriter<>(bucketCheckInterval, bucketsBuilder, inlongMetric, auditHostAndPorts);
return inputStream
.transform(
StreamingFileWriter.class.getSimpleName(),
@@ -92,10 +92,10 @@ public class StreamingSink {
CompactReader.Factory<T> readFactory,
long targetFileSize,
int parallelism,
- String inLongMetric,
+ String inlongMetric,
String auditHostAndPorts) {
CompactFileWriter<T> writer = new CompactFileWriter<>(
- bucketCheckInterval, bucketsBuilder, inLongMetric, auditHostAndPorts);
+ bucketCheckInterval, bucketsBuilder, inlongMetric, auditHostAndPorts);
SupplierWithException<FileSystem, IOException> fsSupplier =
(SupplierWithException<FileSystem, IOException> & Serializable)
diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java
index 9dbe065aa..5a744cd0b 100644
--- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java
+++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java
@@ -102,7 +102,7 @@ public class HiveTableInlongFactory implements DynamicTableSourceFactory, Dynami
Integer configuredParallelism =
Configuration.fromMap(context.getCatalogTable().getOptions())
.get(FileSystemOptions.SINK_PARALLELISM);
- final String inLongMetric = context.getCatalogTable().getOptions()
+ final String inlongMetric = context.getCatalogTable().getOptions()
.getOrDefault(INLONG_METRIC.key(), INLONG_METRIC.defaultValue());
final String auditHostAndPorts = context.getCatalogTable().getOptions()
.getOrDefault(INLONG_AUDIT.key(), INLONG_AUDIT.defaultValue());
@@ -113,7 +113,7 @@ public class HiveTableInlongFactory implements DynamicTableSourceFactory, Dynami
context.getObjectIdentifier(),
context.getCatalogTable(),
configuredParallelism,
- inLongMetric,
+ inlongMetric,
auditHostAndPorts);
} else {
return FactoryUtil.createTableSink(
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index a7099c3cc..d86857338 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -134,7 +134,7 @@ public class FlinkSink {
private boolean upsert = false;
private List<String> equalityFieldColumns = null;
private String uidPrefix = null;
- private String inLongMetric = null;
+ private String inlongMetric = null;
private String auditHostAndPorts = null;
private Builder() {
@@ -198,12 +198,12 @@ public class FlinkSink {
/**
* Add metric output for iceberg writer
- * @param inLongMetric
+ * @param inlongMetric
* @param auditHostAndPorts
* @return
*/
- public Builder metric(String inLongMetric, String auditHostAndPorts) {
- this.inLongMetric = inLongMetric;
+ public Builder metric(String inlongMetric, String auditHostAndPorts) {
+ this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
return this;
}
@@ -400,7 +400,7 @@ public class FlinkSink {
}
IcebergStreamWriter<RowData> streamWriter = createStreamWriter(
- table, flinkRowType, equalityFieldIds, upsertMode, inLongMetric, auditHostAndPorts);
+ table, flinkRowType, equalityFieldIds, upsertMode, inlongMetric, auditHostAndPorts);
int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
SingleOutputStreamOperator<WriteResult> writerStream = input
@@ -474,7 +474,7 @@ public class FlinkSink {
RowType flinkRowType,
List<Integer> equalityFieldIds,
boolean upsert,
- String inLongMetric,
+ String inlongMetric,
String auditHostAndPorts) {
Preconditions.checkArgument(table != null, "Iceberg table should't be null");
Map<String, String> props = table.properties();
@@ -486,7 +486,7 @@ public class FlinkSink {
serializableTable, flinkRowType, targetFileSize,
fileFormat, equalityFieldIds, upsert);
- return new IcebergStreamWriter<>(table.name(), taskWriterFactory, inLongMetric, auditHostAndPorts);
+ return new IcebergStreamWriter<>(table.name(), taskWriterFactory, inlongMetric, auditHostAndPorts);
}
private static FileFormat getFileFormat(Map<String, String> properties) {
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
index 118a97e15..75eca46c5 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
@@ -43,7 +43,7 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
private final String fullTableName;
private final TaskWriterFactory<T> taskWriterFactory;
- private final String inLongMetric;
+ private final String inlongMetric;
private final String auditHostAndPorts;
private transient TaskWriter<T> writer;
@@ -55,11 +55,11 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
IcebergStreamWriter(
String fullTableName,
TaskWriterFactory<T> taskWriterFactory,
- String inLongMetric,
+ String inlongMetric,
String auditHostAndPorts) {
this.fullTableName = fullTableName;
this.taskWriterFactory = taskWriterFactory;
- this.inLongMetric = inLongMetric;
+ this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
setChainingStrategy(ChainingStrategy.ALWAYS);
}
@@ -76,13 +76,13 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>
this.writer = taskWriterFactory.create();
// Initialize metric
- if (inLongMetric != null) {
- String[] inLongMetricArray = inLongMetric.split(DELIMITER);
- String inLongGroupId = inLongMetricArray[0];
- String inLongStreamId = inLongMetricArray[1];
- String nodeId = inLongMetricArray[2];
+ if (inlongMetric != null) {
+ String[] inlongMetricArray = inlongMetric.split(DELIMITER);
+ String inlongGroupId = inlongMetricArray[0];
+ String inlongStreamId = inlongMetricArray[1];
+ String nodeId = inlongMetricArray[2];
metricData = new SinkMetricData(
- inLongGroupId, inLongStreamId, nodeId, getRuntimeContext().getMetricGroup(), auditHostAndPorts);
+ inlongGroupId, inlongStreamId, nodeId, getRuntimeContext().getMetricGroup(), auditHostAndPorts);
metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
index 98f0f0cf6..66af78c4d 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -69,7 +69,7 @@ public class JdbcBatchingOutputFormat<
private final JdbcExecutionOptions executionOptions;
private final StatementExecutorFactory<JdbcExec> statementExecutorFactory;
private final RecordExtractor<In, JdbcIn> jdbcRecordExtractor;
- private final String inLongMetric;
+ private final String inlongMetric;
private final String auditHostAndPorts;
private transient JdbcExec jdbcStatementExecutor;
private transient int batchCount = 0;
@@ -80,8 +80,8 @@ public class JdbcBatchingOutputFormat<
private transient RuntimeContext runtimeContext;
private SinkMetricData sinkMetricData;
- private String inLongGroupId;
- private String inLongStreamId;
+ private String inlongGroupId;
+ private String inlongStreamId;
private transient AuditImp auditImp;
private Long dataSize = 0L;
private Long rowSize = 0L;
@@ -91,13 +91,13 @@ public class JdbcBatchingOutputFormat<
@Nonnull JdbcExecutionOptions executionOptions,
@Nonnull StatementExecutorFactory<JdbcExec> statementExecutorFactory,
@Nonnull RecordExtractor<In, JdbcIn> recordExtractor,
- String inLongMetric,
+ String inlongMetric,
String auditHostAndPorts) {
super(connectionProvider);
this.executionOptions = checkNotNull(executionOptions);
this.statementExecutorFactory = checkNotNull(statementExecutorFactory);
this.jdbcRecordExtractor = checkNotNull(recordExtractor);
- this.inLongMetric = inLongMetric;
+ this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
}
@@ -130,12 +130,12 @@ public class JdbcBatchingOutputFormat<
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
this.runtimeContext = getRuntimeContext();
- if (inLongMetric != null && !inLongMetric.isEmpty()) {
- String[] inLongMetricArray = inLongMetric.split(DELIMITER);
- inLongGroupId = inLongMetricArray[0];
- inLongStreamId = inLongMetricArray[1];
- String nodeId = inLongMetricArray[2];
- sinkMetricData = new SinkMetricData(inLongGroupId, inLongStreamId, nodeId, runtimeContext.getMetricGroup());
+ if (inlongMetric != null && !inlongMetric.isEmpty()) {
+ String[] inlongMetricArray = inlongMetric.split(DELIMITER);
+ inlongGroupId = inlongMetricArray[0];
+ inlongStreamId = inlongMetricArray[1];
+ String nodeId = inlongMetricArray[2];
+ sinkMetricData = new SinkMetricData(inlongGroupId, inlongStreamId, nodeId, runtimeContext.getMetricGroup());
sinkMetricData.registerMetricsForDirtyBytes();
sinkMetricData.registerMetricsForDirtyRecords();
sinkMetricData.registerMetricsForNumBytesOut();
@@ -207,8 +207,8 @@ public class JdbcBatchingOutputFormat<
if (auditImp != null) {
auditImp.add(
AUDIT_SORT_INPUT,
- inLongGroupId,
- inLongStreamId,
+ inlongGroupId,
+ inlongStreamId,
System.currentTimeMillis(),
1,
length);
@@ -371,7 +371,7 @@ public class JdbcBatchingOutputFormat<
private String[] fieldNames;
private String[] keyFields;
private int[] fieldTypes;
- private String inLongMetric;
+ private String inlongMetric;
private String auditHostAndPorts;
private JdbcExecutionOptions.Builder executionOptionsBuilder =
JdbcExecutionOptions.builder();
@@ -409,10 +409,10 @@ public class JdbcBatchingOutputFormat<
}
/**
- * required, inLongMetric
+ * required, inlongMetric
*/
- public Builder setinLongMetric(String inLongMetric) {
- this.inLongMetric = inLongMetric;
+ public Builder setinlongMetric(String inlongMetric) {
+ this.inlongMetric = inlongMetric;
return this;
}
@@ -471,7 +471,7 @@ public class JdbcBatchingOutputFormat<
new SimpleJdbcConnectionProvider(options),
dml,
executionOptionsBuilder.build(),
- inLongMetric,
+ inlongMetric,
auditHostAndPorts);
} else {
// warn: don't close over builder fields
@@ -493,7 +493,7 @@ public class JdbcBatchingOutputFormat<
Preconditions.checkArgument(tuple2.f0);
return tuple2.f1;
},
- inLongMetric,
+ inlongMetric,
auditHostAndPorts);
}
}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
index 4ef1ff2e3..4101913c9 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
@@ -58,14 +58,14 @@ class TableJdbcUpsertOutputFormat
JdbcConnectionProvider connectionProvider,
JdbcDmlOptions dmlOptions,
JdbcExecutionOptions batchOptions,
- String inLongMetric,
+ String inlongMetric,
String auditHostAndPorts) {
this(
connectionProvider,
batchOptions,
ctx -> createUpsertRowExecutor(dmlOptions, ctx),
ctx -> createDeleteExecutor(dmlOptions, ctx),
- inLongMetric,
+ inlongMetric,
auditHostAndPorts);
}
@@ -76,11 +76,11 @@ class TableJdbcUpsertOutputFormat
StatementExecutorFactory<JdbcBatchStatementExecutor<Row>> statementExecutorFactory,
StatementExecutorFactory<JdbcBatchStatementExecutor<Row>>
deleteStatementExecutorFactory,
- String inLongMetric,
+ String inlongMetric,
String auditHostAndPorts
) {
super(connectionProvider, batchOptions, statementExecutorFactory, tuple2 -> tuple2.f1,
- inLongMetric, auditHostAndPorts);
+ inlongMetric, auditHostAndPorts);
this.deleteStatementExecutorFactory = deleteStatementExecutorFactory;
}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
index 1e0369e2c..fa46f8436 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
@@ -64,7 +64,7 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable {
private boolean appendMode;
private TypeInformation<RowData> rowDataTypeInformation;
private DataType[] fieldDataTypes;
- private String inLongMetric;
+ private String inlongMetric;
private String auditHostAndPorts;
public JdbcDynamicOutputFormatBuilder() {
@@ -236,8 +236,8 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable {
return this;
}
- public JdbcDynamicOutputFormatBuilder setInLongMetric(String inLongMetric) {
- this.inLongMetric = inLongMetric;
+ public JdbcDynamicOutputFormatBuilder setInLongMetric(String inlongMetric) {
+ this.inlongMetric = inlongMetric;
return this;
}
@@ -264,7 +264,7 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable {
createBufferReduceExecutor(
dmlOptions, ctx, rowDataTypeInformation, logicalTypes),
JdbcBatchingOutputFormat.RecordExtractor.identity(),
- inLongMetric,
+ inlongMetric,
auditHostAndPorts);
} else {
// append only query
@@ -285,7 +285,7 @@ public class JdbcDynamicOutputFormatBuilder implements Serializable {
sql,
rowDataTypeInformation),
JdbcBatchingOutputFormat.RecordExtractor.identity(),
- inLongMetric,
+ inlongMetric,
auditHostAndPorts);
}
}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
index 328f86a45..1efe0a91b 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
@@ -188,7 +188,7 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
boolean appendMode = config.get(SINK_APPEND_MODE);
- String inLongMetric = config.getOptional(INLONG_METRIC).orElse(null);
+ String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
String auditHostAndPorts = config.getOptional(INLONG_AUDIT).orElse(null);
return new JdbcDynamicTableSink(
jdbcOptions,
@@ -196,7 +196,7 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
getJdbcDmlOptions(jdbcOptions, physicalSchema),
physicalSchema,
appendMode,
- inLongMetric,
+ inlongMetric,
auditHostAndPorts);
}
diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
index 92e54e816..f6dd579b3 100644
--- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
+++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
@@ -50,7 +50,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
private final TableSchema tableSchema;
private final String dialectName;
- private final String inLongMetric;
+ private final String inlongMetric;
private final String auditHostAndPorts;
private final boolean appendMode;
@@ -60,7 +60,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
JdbcDmlOptions dmlOptions,
TableSchema tableSchema,
boolean appendMode,
- String inLongMetric,
+ String inlongMetric,
String auditHostAndPorts) {
this.jdbcOptions = jdbcOptions;
this.executionOptions = executionOptions;
@@ -68,7 +68,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
this.tableSchema = tableSchema;
this.dialectName = dmlOptions.getDialect().dialectName();
this.appendMode = appendMode;
- this.inLongMetric = inLongMetric;
+ this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
}
@@ -101,7 +101,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
builder.setJdbcExecutionOptions(executionOptions);
builder.setRowDataTypeInfo(rowDataTypeInformation);
builder.setFieldDataTypes(tableSchema.getFieldDataTypes());
- builder.setInLongMetric(inLongMetric);
+ builder.setInLongMetric(inlongMetric);
builder.setAuditHostAndPorts(auditHostAndPorts);
return SinkFunctionProvider.of(
new GenericJdbcSinkFunction<>(builder.build()), jdbcOptions.getParallelism());
@@ -110,7 +110,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
@Override
public DynamicTableSink copy() {
return new JdbcDynamicTableSink(jdbcOptions, executionOptions, dmlOptions,
- tableSchema, appendMode, inLongMetric, auditHostAndPorts);
+ tableSchema, appendMode, inlongMetric, auditHostAndPorts);
}
@Override
@@ -132,13 +132,13 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
&& Objects.equals(dmlOptions, that.dmlOptions)
&& Objects.equals(tableSchema, that.tableSchema)
&& Objects.equals(dialectName, that.dialectName)
- && Objects.equals(inLongMetric, that.inLongMetric)
+ && Objects.equals(inlongMetric, that.inlongMetric)
&& Objects.equals(auditHostAndPorts, that.auditHostAndPorts);
}
@Override
public int hashCode() {
return Objects.hash(jdbcOptions, executionOptions, dmlOptions, tableSchema, dialectName,
- inLongMetric, auditHostAndPorts);
+ inlongMetric, auditHostAndPorts);
}
}
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
index 75b965340..b2efd2c3e 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
@@ -215,7 +215,7 @@ public class FlinkKafkaProducer<IN>
/**
* Metric for InLong
*/
- private final String inLongMetric;
+ private final String inlongMetric;
/**
* audit host and ports
*/
@@ -245,11 +245,11 @@ public class FlinkKafkaProducer<IN>
/**
* inLong groupId
*/
- private String inLongGroupId;
+ private String inlongGroupId;
/**
* inLong streamId
*/
- private String inLongStreamId;
+ private String inlongStreamId;
/**
* sink metric data
*/
@@ -609,7 +609,7 @@ public class FlinkKafkaProducer<IN>
Properties producerConfig,
FlinkKafkaProducer.Semantic semantic,
int kafkaProducersPoolSize,
- String inLongMetric,
+ String inlongMetric,
String auditHostAndPorts) {
this(
defaultTopic,
@@ -619,7 +619,7 @@ public class FlinkKafkaProducer<IN>
producerConfig,
semantic,
kafkaProducersPoolSize,
- inLongMetric,
+ inlongMetric,
auditHostAndPorts);
}
@@ -659,13 +659,13 @@ public class FlinkKafkaProducer<IN>
Properties producerConfig,
FlinkKafkaProducer.Semantic semantic,
int kafkaProducersPoolSize,
- String inLongMetric,
+ String inlongMetric,
String auditHostAndPorts) {
super(
new FlinkKafkaProducer.TransactionStateSerializer(),
new FlinkKafkaProducer.ContextStateSerializer());
- this.inLongMetric = inLongMetric;
+ this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
this.defaultTopicId = checkNotNull(defaultTopic, "defaultTopic is null");
@@ -905,12 +905,12 @@ public class FlinkKafkaProducer<IN>
RuntimeContextInitializationContextAdapters.serializationAdapter(
getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
}
- if (inLongMetric != null && !inLongMetric.isEmpty()) {
- String[] inLongMetricArray = inLongMetric.split(DELIMITER);
- inLongGroupId = inLongMetricArray[0];
- inLongStreamId = inLongMetricArray[1];
- String nodeId = inLongMetricArray[2];
- metricData = new SinkMetricData(inLongGroupId, inLongStreamId, nodeId, ctx.getMetricGroup());
+ if (inlongMetric != null && !inlongMetric.isEmpty()) {
+ String[] inlongMetricArray = inlongMetric.split(DELIMITER);
+ inlongGroupId = inlongMetricArray[0];
+ inlongStreamId = inlongMetricArray[1];
+ String nodeId = inlongMetricArray[2];
+ metricData = new SinkMetricData(inlongGroupId, inlongStreamId, nodeId, ctx.getMetricGroup());
metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
@@ -945,8 +945,8 @@ public class FlinkKafkaProducer<IN>
if (auditImp != null) {
auditImp.add(
Constants.AUDIT_SORT_OUTPUT,
- inLongGroupId,
- inLongStreamId,
+ inlongGroupId,
+ inlongStreamId,
System.currentTimeMillis(),
1,
record.value().length);
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
index 387baee79..6f66a6e3d 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
@@ -139,7 +139,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
/**
* Metric for inLong
*/
- private final String inLongMetric;
+ private final String inlongMetric;
/**
* audit host and ports
*/
@@ -172,7 +172,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
boolean upsertMode,
SinkBufferFlushMode flushMode,
@Nullable Integer parallelism,
- String inLongMetric,
+ String inlongMetric,
String auditHostAndPorts) {
// Format attributes
this.consumedDataType =
@@ -200,7 +200,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
"Sink buffer flush is only supported in upsert-kafka.");
}
this.parallelism = parallelism;
- this.inLongMetric = inLongMetric;
+ this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
}
@@ -302,7 +302,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
upsertMode,
flushMode,
parallelism,
- inLongMetric,
+ inlongMetric,
auditHostAndPorts);
copy.metadataKeys = metadataKeys;
return copy;
@@ -337,7 +337,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
&& Objects.equals(upsertMode, that.upsertMode)
&& Objects.equals(flushMode, that.flushMode)
&& Objects.equals(parallelism, that.parallelism)
- && Objects.equals(inLongMetric, that.inLongMetric)
+ && Objects.equals(inlongMetric, that.inlongMetric)
&& Objects.equals(auditHostAndPorts, that.auditHostAndPorts);
}
@@ -359,7 +359,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
upsertMode,
flushMode,
parallelism,
- inLongMetric,
+ inlongMetric,
auditHostAndPorts);
}
@@ -420,7 +420,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
properties,
FlinkKafkaProducer.Semantic.valueOf(semantic.toString()),
FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE,
- inLongMetric,
+ inlongMetric,
auditHostAndPorts);
}
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
index ef8660280..17e92abda 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -67,11 +67,11 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
private SourceMetricData metricData;
- private String inLongGroupId;
+ private String inlongGroupId;
private String auditHostAndPorts;
- private String inLongStreamId;
+ private String inlongStreamId;
private transient AuditImp auditImp;
@@ -85,7 +85,7 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
MetadataConverter[] metadataConverters,
TypeInformation<RowData> producedTypeInfo,
boolean upsertMode,
- String inLongMetric,
+ String inlongMetric,
String auditHostAndPorts) {
if (upsertMode) {
Preconditions.checkArgument(
@@ -105,7 +105,7 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
upsertMode);
this.producedTypeInfo = producedTypeInfo;
this.upsertMode = upsertMode;
- this.inlongMetric = inLongMetric;
+ this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
}
@@ -117,11 +117,11 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
}
valueDeserialization.open(context);
if (inlongMetric != null && !inlongMetric.isEmpty()) {
- String[] inLongMetricArray = inlongMetric.split(DELIMITER);
- inLongGroupId = inLongMetricArray[0];
- inLongStreamId = inLongMetricArray[1];
- String nodeId = inLongMetricArray[2];
- metricData = new SourceMetricData(inLongGroupId, inLongStreamId, nodeId, context.getMetricGroup());
+ String[] inlongMetricArray = inlongMetric.split(DELIMITER);
+ inlongGroupId = inlongMetricArray[0];
+ inlongStreamId = inlongMetricArray[1];
+ String nodeId = inlongMetricArray[2];
+ metricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, context.getMetricGroup());
metricData.registerMetricsForNumBytesIn();
metricData.registerMetricsForNumBytesInPerSecond();
metricData.registerMetricsForNumRecordsIn();
@@ -186,8 +186,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
if (auditImp != null) {
auditImp.add(
Constants.AUDIT_SORT_INPUT,
- inLongGroupId,
- inLongStreamId,
+ inlongGroupId,
+ inlongStreamId,
System.currentTimeMillis(),
1,
record.value().length);
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
index d49af2d00..f3580a8f1 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
@@ -140,7 +140,7 @@ public class KafkaDynamicSource
/** Flag to determine source mode. In upsert mode, it will keep the tombstone message. * */
protected final boolean upsertMode;
- protected final String inLongMetric;
+ protected final String inlongMetric;
protected final String auditHostAndPorts;
@@ -158,7 +158,7 @@ public class KafkaDynamicSource
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis,
boolean upsertMode,
- final String inLongMetric,
+ final String inlongMetric,
final String auditHostAndPorts) {
// Format attributes
this.physicalDataType =
@@ -192,7 +192,7 @@ public class KafkaDynamicSource
specificStartupOffsets, "Specific offsets must not be null.");
this.startupTimestampMillis = startupTimestampMillis;
this.upsertMode = upsertMode;
- this.inLongMetric = inLongMetric;
+ this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
}
@@ -214,7 +214,7 @@ public class KafkaDynamicSource
final FlinkKafkaConsumer<RowData> kafkaConsumer =
createKafkaConsumer(keyDeserialization, valueDeserialization,
- producedTypeInfo, inLongMetric, auditHostAndPorts);
+ producedTypeInfo, inlongMetric, auditHostAndPorts);
return SourceFunctionProvider.of(kafkaConsumer, false);
}
@@ -284,7 +284,7 @@ public class KafkaDynamicSource
startupMode,
specificStartupOffsets,
startupTimestampMillis,
- upsertMode, inLongMetric, auditHostAndPorts);
+ upsertMode, inlongMetric, auditHostAndPorts);
copy.producedDataType = producedDataType;
copy.metadataKeys = metadataKeys;
copy.watermarkStrategy = watermarkStrategy;
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index 8efee1317..2127886d3 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -254,7 +254,7 @@ public class KafkaDynamicTableFactory
final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
- final String inLongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
+ final String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
final String auditHostAndPorts = tableOptions.getOptional(INLONG_AUDIT).orElse(null);
@@ -271,7 +271,7 @@ public class KafkaDynamicTableFactory
startupOptions.startupMode,
startupOptions.specificOffsets,
startupOptions.startupTimestampMillis,
- inLongMetric,
+ inlongMetric,
auditHostAndPorts);
}
@@ -307,7 +307,7 @@ public class KafkaDynamicTableFactory
final Integer parallelism = tableOptions.getOptional(SINK_PARALLELISM).orElse(null);
- final String inLongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
+ final String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
final String auditHostAndPorts = tableOptions.getOptional(INLONG_AUDIT).orElse(null);
@@ -324,7 +324,7 @@ public class KafkaDynamicTableFactory
getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()).orElse(null),
getSinkSemantic(tableOptions),
parallelism,
- inLongMetric,
+ inlongMetric,
auditHostAndPorts);
}
@@ -343,7 +343,7 @@ public class KafkaDynamicTableFactory
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis,
- String inLongMetric,
+ String inlongMetric,
String auditHostAndPorts) {
return new KafkaDynamicSource(
physicalDataType,
@@ -359,7 +359,7 @@ public class KafkaDynamicTableFactory
specificStartupOffsets,
startupTimestampMillis,
false,
- inLongMetric,
+ inlongMetric,
auditHostAndPorts);
}
@@ -376,7 +376,7 @@ public class KafkaDynamicTableFactory
FlinkKafkaPartitioner<RowData> partitioner,
KafkaSinkSemantic semantic,
Integer parallelism,
- String inLongMetric,
+ String inlongMetric,
String auditHostAndPorts) {
return new KafkaDynamicSink(
physicalDataType,
@@ -394,7 +394,7 @@ public class KafkaDynamicTableFactory
false,
SinkBufferFlushMode.DISABLED,
parallelism,
- inLongMetric,
+ inlongMetric,
auditHostAndPorts);
}
}
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
index 6e803cfd9..e3fa28abc 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -256,7 +256,7 @@ public class UpsertKafkaDynamicTableFactory
Duration batchInterval = tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL);
SinkBufferFlushMode flushMode =
new SinkBufferFlushMode(batchSize, batchInterval.toMillis());
- String inLongMetric = tableOptions.get(INLONG_METRIC);
+ String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
final String auditHostAndPorts = tableOptions.getOptional(INLONG_AUDIT).orElse(null);
// use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
@@ -277,7 +277,7 @@ public class UpsertKafkaDynamicTableFactory
true,
flushMode,
parallelism,
- inLongMetric,
+ inlongMetric,
auditHostAndPorts);
}
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
index 450990eb4..42bb53732 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
@@ -59,7 +59,6 @@ import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
@@ -418,7 +417,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
metricGroup.gauge(
"sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
if (StringUtils.isNotEmpty(this.inlongMetric)) {
- String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER);
+ String[] inlongMetricArray = inlongMetric.split(DELIMITER);
String groupId = inlongMetricArray[0];
String streamId = inlongMetricArray[1];
String nodeId = inlongMetricArray[2];
diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
index d235bc860..919c227e4 100644
--- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
+++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
@@ -224,7 +224,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zoneId)
? ZoneId.systemDefault()
: ZoneId.of(zoneId);
- final String inlongMetric = config.get(INLONG_METRIC);
+ final String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
final String inlongAudit = config.get(INLONG_AUDIT);
ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
index d5b3c676b..a7eebdbcd 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
@@ -48,7 +48,6 @@ import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeConsumer;
import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeFetcher;
@@ -418,7 +417,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
metricGroup.gauge(
"sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
if (StringUtils.isNotEmpty(this.inlongMetric)) {
- String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER);
+ String[] inlongMetricArray = inlongMetric.split(DELIMITER);
String groupId = inlongMetricArray[0];
String streamId = inlongMetricArray[1];
String nodeId = inlongMetricArray[2];
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
index c3c475e92..c8a70b8bd 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
@@ -122,7 +122,7 @@ public class MySqlTableInlongSourceFactory implements DynamicTableSourceFactory
DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX, JdbcUrlUtils.PROPERTIES_PREFIX);
final ReadableConfig config = helper.getOptions();
- final String inlongMetric = config.get(INLONG_METRIC);
+ final String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
final String inlongAudit = config.get(INLONG_AUDIT);
ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
final String hostname = config.get(HOSTNAME);
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
index 4098cc563..1458693fb 100644
--- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java
@@ -59,7 +59,6 @@ import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
@@ -418,7 +417,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
metricGroup.gauge(
"sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
if (StringUtils.isNotEmpty(this.inlongMetric)) {
- String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER);
+ String[] inlongMetricArray = inlongMetric.split(DELIMITER);
String groupId = inlongMetricArray[0];
String streamId = inlongMetricArray[1];
String nodeId = inlongMetricArray[2];
diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
index 830bfd078..0b4471ae3 100644
--- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
+++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
@@ -112,7 +112,7 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
int port = config.get(PORT);
StartupOptions startupOptions = getStartupOptions(config);
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
- String inlongMetric = config.get(INLONG_METRIC);
+ String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
String inlongAudit = config.get(INLONG_AUDIT);
ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
return new OracleTableSource(
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
index ece63db62..cd78621db 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
@@ -59,7 +59,6 @@ import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
@@ -418,7 +417,7 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
metricGroup.gauge(
"sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
if (StringUtils.isNotEmpty(this.inlongMetric)) {
- String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER);
+ String[] inlongMetricArray = inlongMetric.split(DELIMITER);
String groupId = inlongMetricArray[0];
String streamId = inlongMetricArray[1];
String nodeId = inlongMetricArray[2];
diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
index e13fc49ae..a886f8aef 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
+++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
@@ -125,7 +125,7 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
String pluginName = config.get(DECODING_PLUGIN_NAME);
String slotName = config.get(SLOT_NAME);
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
- String inlongMetric = config.get(INLONG_METRIC);
+ String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
String inlongAudit = config.get(INLONG_AUDIT);
ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric, inlongAudit);
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
index 13d6e7c21..bf74dc8d2 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
@@ -273,7 +273,9 @@ public class PulsarDynamicTableFactory implements
String adminUrl = tableOptions.get(ADMIN_URL);
String serviceUrl = tableOptions.get(SERVICE_URL);
- String inlongMetric = tableOptions.get(INLONG_METRIC);
+
+ String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
+
String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
return createPulsarTableSource(
@@ -361,7 +363,7 @@ public class PulsarDynamicTableFactory implements
String adminUrl,
Properties properties,
PulsarTableOptions.StartupOptions startupOptions,
- String inLongMetric,
+ String inlongMetric,
String auditHostAndPorts) {
return new PulsarDynamicTableSource(
physicalDataType,
@@ -377,7 +379,7 @@ public class PulsarDynamicTableFactory implements
properties,
startupOptions,
false,
- inLongMetric,
+ inlongMetric,
auditHostAndPorts);
}
}
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
index c709daa66..ff0361b16 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
@@ -187,7 +187,7 @@ public class UpsertPulsarDynamicTableFactory implements DynamicTableSourceFactor
String serverUrl = tableOptions.get(SERVICE_URL);
List<String> topics = tableOptions.get(TOPIC);
String topicPattern = tableOptions.get(TOPIC_PATTERN);
- String inlongMetric = tableOptions.get(INLONG_METRIC);
+ String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
return new PulsarDynamicTableSource(
diff --git a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
index c1cea9dba..c28115846 100644
--- a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
@@ -217,11 +217,11 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
private SourceMetricData metricData;
- private String inLongGroupId;
+ private String inlongGroupId;
private String auditHostAndPorts;
- private String inLongStreamId;
+ private String inlongStreamId;
private transient AuditImp auditImp;
@@ -413,11 +413,11 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
metricGroup.gauge(
"sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime());
if (StringUtils.isNotEmpty(this.inlongMetric)) {
- String[] inlongMetricArray = inlongMetric.split(Constants.DELIMITER);
- inLongGroupId = inlongMetricArray[0];
- inLongStreamId = inlongMetricArray[1];
+ String[] inlongMetricArray = inlongMetric.split(DELIMITER);
+ inlongGroupId = inlongMetricArray[0];
+ inlongStreamId = inlongMetricArray[1];
String nodeId = inlongMetricArray[2];
- metricData = new SourceMetricData(inLongGroupId, inLongStreamId, nodeId, metricGroup);
+ metricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, metricGroup);
metricData.registerMetricsForNumRecordsIn();
metricData.registerMetricsForNumBytesIn();
metricData.registerMetricsForNumBytesInPerSecond();
@@ -511,8 +511,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
if (auditImp != null) {
auditImp.add(
Constants.AUDIT_SORT_INPUT,
- inLongGroupId,
- inLongStreamId,
+ inlongGroupId,
+ inlongStreamId,
System.currentTimeMillis(),
1,
record.value().toString().getBytes(StandardCharsets.UTF_8).length);
diff --git a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java
index e6a7e5be6..39e6a642c 100644
--- a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java
+++ b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java
@@ -114,7 +114,7 @@ public class SqlServerTableFactory implements DynamicTableSourceFactory {
String schemaName = config.get(SCHEMA_NAME);
String databaseName = config.get(DATABASE_NAME);
String tableName = config.get(TABLE_NAME);
- String inlongMetric = config.get(INLONG_METRIC);
+ String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
String auditHostAndPorts = config.get(INLONG_AUDIT);
ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
int port = config.get(PORT);