You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/12/24 07:33:23 UTC
(seatunnel) branch dev updated: [Feature][Core] Support record metrics in flink engine (#6035)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a9ec9d5cc5 [Feature][Core] Support record metrics in flink engine (#6035)
a9ec9d5cc5 is described below
commit a9ec9d5cc5ac402778de080bb4b39ce315a46c3c
Author: Tyrantlucifer <Ty...@gmail.com>
AuthorDate: Sun Dec 24 15:33:17 2023 +0800
[Feature][Core] Support record metrics in flink engine (#6035)
---
.../seatunnel/common/utils/DateTimeUtils.java | 16 +++
.../seatunnel/common/utils/DateTimeUtilsTest.java | 57 ++++++++
.../starter/flink/execution/FlinkExecution.java | 37 +++--
.../e2e/connector/fake/FlinkMetricsIT.java | 156 +++++++++++++++++++++
.../fake_to_assert_verify_flink_metrics.conf | 109 ++++++++++++++
.../flink/AbstractTestFlinkContainer.java | 3 +
.../flink/metric/FlinkGroupCounter.java | 73 ++++++++++
.../flink/metric/FlinkMetricContext.java | 120 ++++++++++++++++
.../flink/sink/FlinkSinkWriterContext.java | 74 ++++++++++
.../translation/flink/metric/FlinkCounter.java | 75 ++++++++++
.../flink/metric/FlinkJobMetricsSummary.java | 106 ++++++++++++++
.../translation/flink/metric/FlinkMeter.java | 63 +++++++++
.../flink/metric/FlinkMetricContext.java | 93 ++++++++++++
.../translation/flink/sink/FlinkSink.java | 11 +-
.../translation/flink/sink/FlinkSinkWriter.java | 23 ++-
.../flink/sink/FlinkSinkWriterContext.java | 58 ++++++++
.../flink/source/FlinkRowCollector.java | 19 ++-
.../flink/source/FlinkSourceReader.java | 3 +-
.../flink/source/FlinkSourceReaderContext.java | 16 ++-
19 files changed, 1092 insertions(+), 20 deletions(-)
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java
index bf81ed626c..885634e8a7 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java
@@ -17,7 +17,9 @@
package org.apache.seatunnel.common.utils;
+import java.time.Instant;
import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
@@ -49,10 +51,24 @@ public class DateTimeUtils {
return LocalDateTime.parse(dateTime, FORMATTER_MAP.get(formatter));
}
+ public static LocalDateTime parse(long timestamp) {
+ return parse(timestamp, ZoneId.systemDefault());
+ }
+
+ public static LocalDateTime parse(long timestamp, ZoneId zoneId) {
+ Instant instant = Instant.ofEpochMilli(timestamp);
+ return LocalDateTime.ofInstant(instant, zoneId);
+ }
+
public static String toString(LocalDateTime dateTime, Formatter formatter) {
return dateTime.format(FORMATTER_MAP.get(formatter));
}
+ public static String toString(long timestamp, Formatter formatter) {
+ Instant instant = Instant.ofEpochMilli(timestamp);
+ return toString(LocalDateTime.ofInstant(instant, ZoneId.systemDefault()), formatter);
+ }
+
public enum Formatter {
YYYY_MM_DD_HH_MM_SS("yyyy-MM-dd HH:mm:ss"),
YYYY_MM_DD_HH_MM_SS_SPOT("yyyy.MM.dd HH:mm:ss"),
diff --git a/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/DateTimeUtilsTest.java b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/DateTimeUtilsTest.java
new file mode 100644
index 0000000000..ef1f971ce5
--- /dev/null
+++ b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/DateTimeUtilsTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+import org.apache.seatunnel.common.utils.DateTimeUtils.Formatter;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+
+public class DateTimeUtilsTest {
+
+ @Test
+ public void testParseDateString() {
+ final String datetime = "2023-12-22 00:00:00";
+ LocalDateTime parse = DateTimeUtils.parse(datetime, Formatter.YYYY_MM_DD_HH_MM_SS);
+ Assertions.assertEquals(0, parse.getMinute());
+ Assertions.assertEquals(0, parse.getHour());
+ Assertions.assertEquals(0, parse.getSecond());
+ Assertions.assertEquals(22, parse.getDayOfMonth());
+ Assertions.assertEquals(12, parse.getMonth().getValue());
+ Assertions.assertEquals(2023, parse.getYear());
+ Assertions.assertEquals(22, parse.getDayOfMonth());
+ }
+
+ @Test
+ public void testParseTimestamp() {
+ // 2023-12-22 12:55:20
+ final long timestamp = 1703220920013L;
+ LocalDateTime parse = DateTimeUtils.parse(timestamp, ZoneId.of("Asia/Shanghai"));
+
+ Assertions.assertEquals(55, parse.getMinute());
+ Assertions.assertEquals(12, parse.getHour());
+ Assertions.assertEquals(20, parse.getSecond());
+ Assertions.assertEquals(22, parse.getDayOfMonth());
+ Assertions.assertEquals(12, parse.getMonth().getValue());
+ Assertions.assertEquals(2023, parse.getYear());
+ Assertions.assertEquals(22, parse.getDayOfMonth());
+ }
+}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index 39671af080..6b3fbd9b47 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -33,10 +33,13 @@ import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
import org.apache.seatunnel.core.starter.execution.TaskExecution;
import org.apache.seatunnel.core.starter.flink.FlinkStarter;
+import org.apache.seatunnel.translation.flink.metric.FlinkJobMetricsSummary;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.RuntimeExecutionMode;
-import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.MalformedURLException;
@@ -51,8 +54,10 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
/** Used to execute a SeaTunnelTask. */
-@Slf4j
public class FlinkExecution implements TaskExecution {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlinkExecution.class);
+
private final FlinkRuntimeEnvironment flinkRuntimeEnvironment;
private final PluginExecuteProcessor<DataStreamTableInfo, FlinkRuntimeEnvironment>
sourcePluginExecuteProcessor;
@@ -109,20 +114,32 @@ public class FlinkExecution implements TaskExecution {
dataStreams = sourcePluginExecuteProcessor.execute(dataStreams);
dataStreams = transformPluginExecuteProcessor.execute(dataStreams);
sinkPluginExecuteProcessor.execute(dataStreams);
- log.info(
+ LOGGER.info(
"Flink Execution Plan: {}",
flinkRuntimeEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
- log.info("Flink job name: {}", flinkRuntimeEnvironment.getJobName());
+ LOGGER.info("Flink job name: {}", flinkRuntimeEnvironment.getJobName());
if (!flinkRuntimeEnvironment.isStreaming()) {
flinkRuntimeEnvironment
.getStreamExecutionEnvironment()
.setRuntimeMode(RuntimeExecutionMode.BATCH);
- log.info("Flink job Mode: {}", JobMode.BATCH);
+ LOGGER.info("Flink job Mode: {}", JobMode.BATCH);
}
try {
- flinkRuntimeEnvironment
- .getStreamExecutionEnvironment()
- .execute(flinkRuntimeEnvironment.getJobName());
+ final long jobStartTime = System.currentTimeMillis();
+ JobExecutionResult jobResult =
+ flinkRuntimeEnvironment
+ .getStreamExecutionEnvironment()
+ .execute(flinkRuntimeEnvironment.getJobName());
+ final long jobEndTime = System.currentTimeMillis();
+
+ final FlinkJobMetricsSummary jobMetricsSummary =
+ FlinkJobMetricsSummary.builder()
+ .jobExecutionResult(jobResult)
+ .jobStartTime(jobStartTime)
+ .jobEndTime(jobEndTime)
+ .build();
+
+ LOGGER.info("Job finished, execution result: \n{}", jobMetricsSummary);
} catch (Exception e) {
throw new TaskExecuteException("Execute Flink job error", e);
}
@@ -170,9 +187,9 @@ public class FlinkExecution implements TaskExecution {
for (URL jarUrl : jars) {
if (new File(jarUrl.getFile()).exists()) {
validJars.add(jarUrl);
- log.info("Inject jar to config: {}", jarUrl);
+ LOGGER.info("Inject jar to config: {}", jarUrl);
} else {
- log.warn("Remove invalid jar when inject jars into config: {}", jarUrl);
+ LOGGER.warn("Remove invalid jar when inject jars into config: {}", jarUrl);
}
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FlinkMetricsIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FlinkMetricsIT.java
new file mode 100644
index 0000000000..abd059a8b8
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FlinkMetricsIT.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.fake;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.seatunnel.api.common.metrics.MetricNames;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.flink.Flink13Container;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.SEATUNNEL})
+public class FlinkMetricsIT extends TestSuiteBase {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlinkMetricsIT.class);
+
+ @TestTemplate
+ public void testFlinkMetrics(TestContainer container) throws IOException, InterruptedException {
+ Container.ExecResult executeResult =
+ container.executeJob("/fake_to_assert_verify_flink_metrics.conf");
+ Assertions.assertEquals(0, executeResult.getExitCode());
+ final String jobListUrl = "http://%s:8081/jobs/overview";
+ final String jobDetailsUrl = "http://%s:8081/jobs/%s";
+ final String jobAccumulatorUrl = "http://%s:8081/jobs/%s/vertices/%s/accumulators";
+ final String jobManagerHost;
+ String dockerHost = System.getenv("DOCKER_HOST");
+ if (dockerHost == null) {
+ jobManagerHost = "localhost";
+ } else {
+ URI uri = URI.create(dockerHost);
+ jobManagerHost = uri.getHost();
+ }
+ // create http client
+ CloseableHttpClient httpClient = HttpClients.createDefault();
+
+ // get job id
+ HttpGet httpGet = new HttpGet(String.format(jobListUrl, jobManagerHost));
+ CloseableHttpResponse response = httpClient.execute(httpGet);
+ Assertions.assertEquals(response.getStatusLine().getStatusCode(), 200);
+ String responseContent = EntityUtils.toString(response.getEntity());
+ ObjectNode jsonNode = JsonUtils.parseObject(responseContent);
+ String jobId = jsonNode.get("jobs").get(0).get("jid").asText();
+ Assertions.assertNotNull(jobId);
+
+ // get job vertices
+ httpGet = new HttpGet(String.format(jobDetailsUrl, jobManagerHost, jobId));
+ response = httpClient.execute(httpGet);
+ Assertions.assertEquals(response.getStatusLine().getStatusCode(), 200);
+
+ responseContent = EntityUtils.toString(response.getEntity());
+ jsonNode = JsonUtils.parseObject(responseContent);
+ String verticeId = jsonNode.get("vertices").get(0).get("id").asText();
+
+ Awaitility.given()
+ .ignoreExceptions()
+ .await()
+ .atMost(10L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ HttpGet httpGetTemp =
+ new HttpGet(
+ String.format(
+ jobAccumulatorUrl,
+ jobManagerHost,
+ jobId,
+ verticeId));
+ CloseableHttpResponse responseTemp = httpClient.execute(httpGetTemp);
+ String responseContentTemp =
+ EntityUtils.toString(responseTemp.getEntity());
+ JsonNode jsonNodeTemp = JsonUtils.parseObject(responseContentTemp);
+ JsonNode metrics = jsonNodeTemp.get("user-accumulators");
+ int size = metrics.size();
+ if (size <= 0) {
+ throw new IllegalStateException(
+ "Flink metrics not synchronized yet, next round");
+ }
+ });
+
+ // get metrics
+ httpGet = new HttpGet(String.format(jobAccumulatorUrl, jobManagerHost, jobId, verticeId));
+ response = httpClient.execute(httpGet);
+ responseContent = EntityUtils.toString(response.getEntity());
+ jsonNode = JsonUtils.parseObject(responseContent);
+ JsonNode metrics = jsonNode.get("user-accumulators");
+
+ int size = metrics.size();
+
+ Assertions.assertTrue(size > 0);
+
+ Map<String, String> metricsMap = new HashMap<>();
+
+ for (JsonNode metric : metrics) {
+ String name = metric.get("name").asText();
+ String value = metric.get("value").asText();
+ metricsMap.put(name, value);
+ }
+
+ String sourceReceivedCount = metricsMap.get(MetricNames.SOURCE_RECEIVED_COUNT);
+ String sourceReceivedBytes = metricsMap.get(MetricNames.SOURCE_RECEIVED_BYTES);
+
+ Assertions.assertEquals(5, Integer.valueOf(sourceReceivedCount));
+ Assertions.assertEquals(2160, Integer.valueOf(sourceReceivedBytes));
+
+ // Due to limitations in Flink 13 version and code, the metrics on the writer side cannot be
+ // aggregated into the global accumulator and can only be viewed in the operator based on
+ // parallelism dimensions
+ if (!(container instanceof Flink13Container)) {
+ String sinkWriteCount = metricsMap.get(MetricNames.SINK_WRITE_COUNT);
+ String sinkWriteBytes = metricsMap.get(MetricNames.SINK_WRITE_BYTES);
+ Assertions.assertEquals(5, Integer.valueOf(sinkWriteCount));
+ Assertions.assertEquals(2160, Integer.valueOf(sinkWriteBytes));
+ }
+
+ httpClient.close();
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_verify_flink_metrics.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_verify_flink_metrics.conf
new file mode 100644
index 0000000000..c4e1798b48
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_verify_flink_metrics.conf
@@ -0,0 +1,109 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 5
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
index 9a0b353785..82e8e79c11 100644
--- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
@@ -28,6 +28,7 @@ import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerLoggerFactory;
+import com.google.common.collect.Lists;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -85,6 +86,8 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer {
copySeaTunnelStarterToContainer(jobManager);
copySeaTunnelStarterLoggingToContainer(jobManager);
+ jobManager.setPortBindings(Lists.newArrayList(String.format("%s:%s", 8081, 8081)));
+
taskManager =
new GenericContainer<>(dockerImage)
.withCommand("taskmanager")
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkGroupCounter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkGroupCounter.java
new file mode 100644
index 0000000000..bcda819ee9
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkGroupCounter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.metric;
+
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.Unit;
+
+public class FlinkGroupCounter implements Counter {
+
+ private final String name;
+
+ private final org.apache.flink.metrics.Counter counter;
+
+ public FlinkGroupCounter(String name, org.apache.flink.metrics.Counter counter) {
+ this.name = name;
+ this.counter = counter;
+ }
+
+ @Override
+ public void inc() {
+ counter.inc();
+ }
+
+ @Override
+ public void inc(long n) {
+ counter.inc(n);
+ }
+
+ @Override
+ public void dec() {
+ throw new UnsupportedOperationException("Flink metrics does not support dec operation");
+ }
+
+ @Override
+ public void dec(long n) {
+ throw new UnsupportedOperationException("Flink metrics does not support dec operation");
+ }
+
+ @Override
+ public void set(long n) {
+ throw new UnsupportedOperationException("Flink metrics does not support set operation");
+ }
+
+ @Override
+ public long getCount() {
+ return counter.getCount();
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public Unit unit() {
+ return Unit.COUNT;
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkMetricContext.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkMetricContext.java
new file mode 100644
index 0000000000..00316eb7de
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkMetricContext.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.metric;
+
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.Meter;
+import org.apache.seatunnel.api.common.metrics.Metric;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+
+import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class FlinkMetricContext implements MetricsContext {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlinkMetricContext.class);
+
+ private final Map<String, Metric> metrics = new ConcurrentHashMap<>();
+
+ private MetricGroup metricGroup;
+
+ private StreamingRuntimeContext runtimeContext;
+
+ public FlinkMetricContext(MetricGroup metricGroup) {
+ this.metricGroup = metricGroup;
+ }
+
+ public FlinkMetricContext(StreamingRuntimeContext runtimeContext) {
+ this.runtimeContext = runtimeContext;
+ }
+
+ @Override
+ public Counter counter(String name) {
+ if (metrics.containsKey(name)) {
+ return (Counter) metrics.get(name);
+ }
+ Counter counter =
+ runtimeContext == null
+ ? new FlinkGroupCounter(name, metricGroup.counter(name))
+ : new FlinkCounter(name, runtimeContext.getLongCounter(name));
+ return this.counter(name, counter);
+ }
+
+ @Override
+ public <C extends Counter> C counter(String name, C counter) {
+ this.addMetric(name, counter);
+ return counter;
+ }
+
+ @Override
+ public Meter meter(String name) {
+ if (metrics.containsKey(name)) {
+ return (Meter) metrics.get(name);
+ }
+
+ // Why use reflection to obtain metrics group?
+ // Because the value types returned by flink 1.13 and 1.14 runtimeContext.getMetricGroup()
+ // are inconsistent
+ org.apache.flink.metrics.Meter meter;
+ if (runtimeContext == null) {
+ meter = metricGroup.meter(name, new MeterView(5));
+ } else {
+ try {
+ Field field = AbstractRuntimeUDFContext.class.getDeclaredField("metrics");
+ field.setAccessible(true);
+ MetricGroup mg = (MetricGroup) field.get(runtimeContext);
+ meter = mg.meter(name, new MeterView(5));
+ } catch (Exception e) {
+ throw new IllegalStateException("Initial meter failed", e);
+ }
+ }
+ return this.meter(name, new FlinkMeter(name, meter));
+ }
+
+ @Override
+ public <M extends Meter> M meter(String name, M meter) {
+ this.addMetric(name, meter);
+ return meter;
+ }
+
+ protected void addMetric(String name, Metric metric) {
+ if (metric == null) {
+ LOGGER.warn("Ignoring attempted add of a metric due to being null for name {}.", name);
+ return;
+ }
+ synchronized (this) {
+ Metric prior = this.metrics.put(name, metric);
+ if (prior != null) {
+ this.metrics.put(name, prior);
+ LOGGER.warn(
+ "Name collision: MetricsContext already contains a Metric with the name '"
+ + name
+ + "'. Metric will not be reported.");
+ }
+ }
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterContext.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterContext.java
new file mode 100644
index 0000000000..95f72e8ed5
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterContext.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.sink;
+
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.translation.flink.metric.FlinkMetricContext;
+
+import org.apache.flink.api.connector.sink.Sink.InitContext;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+
+public class FlinkSinkWriterContext implements SinkWriter.Context {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlinkMetricContext.class);
+
+ private final InitContext writerContext;
+
+ public FlinkSinkWriterContext(InitContext writerContext) {
+ this.writerContext = writerContext;
+ }
+
+ @Override
+ public int getIndexOfSubtask() {
+ return writerContext.getSubtaskId();
+ }
+
+ @Override
+ public MetricsContext getMetricsContext() {
+ // In flink 1.14, it has contained runtimeContext in InitContext, so first step to detect if
+ // it is existed
+ try {
+ Field field = writerContext.getClass().getDeclaredField("runtimeContext");
+ field.setAccessible(true);
+ StreamingRuntimeContext runtimeContext =
+ (StreamingRuntimeContext) field.get(writerContext);
+ return new FlinkMetricContext(runtimeContext);
+ } catch (Exception e) {
+ LOGGER.info(
+ "Flink version is not 1.14.x, will initial MetricsContext using metricGroup");
+ }
+ // Why use reflection to obtain metrics group?
+ // Because the value types returned by flink 1.13 and 1.14 InitContext.getMetricGroup()
+ // are inconsistent
+ try {
+ Field field = writerContext.getClass().getDeclaredField("metricGroup");
+ field.setAccessible(true);
+ MetricGroup metricGroup = (MetricGroup) field.get(writerContext);
+ return new FlinkMetricContext(metricGroup);
+ } catch (Exception e) {
+ throw new IllegalStateException("Initial sink metrics failed", e);
+ }
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkCounter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkCounter.java
new file mode 100644
index 0000000000..296f46cb56
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkCounter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.metric;
+
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.Unit;
+
+import org.apache.flink.api.common.accumulators.LongCounter;
+
+public class FlinkCounter implements Counter {
+
+ private final String name;
+
+ private final LongCounter longCounter;
+
+ public FlinkCounter(String name, LongCounter longCounter) {
+ this.name = name;
+ this.longCounter = longCounter;
+ }
+
+ @Override
+ public void inc() {
+ inc(1L);
+ }
+
+ @Override
+ public void inc(long n) {
+ longCounter.add(n);
+ }
+
+ @Override
+ public void dec() {
+ throw new UnsupportedOperationException("Flink metrics does not support dec operation");
+ }
+
+ @Override
+ public void dec(long n) {
+ throw new UnsupportedOperationException("Flink metrics does not support dec operation");
+ }
+
+ @Override
+ public void set(long n) {
+ longCounter.add(n);
+ }
+
+ @Override
+ public long getCount() {
+ return longCounter.getLocalValue();
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public Unit unit() {
+ return Unit.COUNT;
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkJobMetricsSummary.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkJobMetricsSummary.java
new file mode 100644
index 0000000000..556f028ecf
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkJobMetricsSummary.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.metric;
+
+import org.apache.seatunnel.api.common.metrics.MetricNames;
+import org.apache.seatunnel.common.utils.DateTimeUtils;
+import org.apache.seatunnel.common.utils.StringFormatUtils;
+
+import org.apache.flink.api.common.JobExecutionResult;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+
+public final class FlinkJobMetricsSummary {
+
+ private final JobExecutionResult jobExecutionResult;
+
+ private final LocalDateTime jobStartTime;
+
+ private final LocalDateTime jobEndTime;
+
+ FlinkJobMetricsSummary(
+ JobExecutionResult jobExecutionResult,
+ LocalDateTime jobStartTime,
+ LocalDateTime jobEndTime) {
+ this.jobExecutionResult = jobExecutionResult;
+ this.jobStartTime = jobStartTime;
+ this.jobEndTime = jobEndTime;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private JobExecutionResult jobExecutionResult;
+
+ private long jobStartTime;
+
+ private long jobEndTime;
+
+ private Builder() {}
+
+ public Builder jobExecutionResult(JobExecutionResult jobExecutionResult) {
+ this.jobExecutionResult = jobExecutionResult;
+ return this;
+ }
+
+ public Builder jobStartTime(long jobStartTime) {
+ this.jobStartTime = jobStartTime;
+ return this;
+ }
+
+ public Builder jobEndTime(long jobEndTime) {
+ this.jobEndTime = jobEndTime;
+ return this;
+ }
+
+ public FlinkJobMetricsSummary build() {
+ return new FlinkJobMetricsSummary(
+ jobExecutionResult,
+ DateTimeUtils.parse(jobStartTime),
+ DateTimeUtils.parse(jobEndTime));
+ }
+ }
+
+ @Override
+ public String toString() {
+ return StringFormatUtils.formatTable(
+ "Job Statistic Information",
+ "Start Time",
+ DateTimeUtils.toString(jobStartTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
+ "End Time",
+ DateTimeUtils.toString(jobEndTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
+ "Total Time(s)",
+ Duration.between(jobStartTime, jobEndTime).getSeconds(),
+ "Total Read Count",
+ jobExecutionResult
+ .getAllAccumulatorResults()
+ .get(MetricNames.SOURCE_RECEIVED_COUNT),
+ "Total Write Count",
+ jobExecutionResult.getAllAccumulatorResults().get(MetricNames.SINK_WRITE_COUNT),
+ "Total Read Bytes",
+ jobExecutionResult
+ .getAllAccumulatorResults()
+ .get(MetricNames.SOURCE_RECEIVED_BYTES),
+ "Total Write Bytes",
+ jobExecutionResult.getAllAccumulatorResults().get(MetricNames.SINK_WRITE_BYTES));
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkMeter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkMeter.java
new file mode 100644
index 0000000000..82b8de0fe6
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkMeter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.metric;
+
+import org.apache.seatunnel.api.common.metrics.Meter;
+import org.apache.seatunnel.api.common.metrics.Unit;
+
+public class FlinkMeter implements Meter {
+
+ private final String name;
+
+ private final org.apache.flink.metrics.Meter meter;
+
+ public FlinkMeter(String name, org.apache.flink.metrics.Meter meter) {
+ this.name = name;
+ this.meter = meter;
+ }
+
+ @Override
+ public void markEvent() {
+ meter.markEvent();
+ }
+
+ @Override
+ public void markEvent(long n) {
+ meter.markEvent(n);
+ }
+
+ @Override
+ public double getRate() {
+ return meter.getRate();
+ }
+
+ @Override
+ public long getCount() {
+ return meter.getCount();
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public Unit unit() {
+ return Unit.COUNT;
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkMetricContext.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkMetricContext.java
new file mode 100644
index 0000000000..3f32e641cc
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkMetricContext.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.metric;
+
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.Meter;
+import org.apache.seatunnel.api.common.metrics.Metric;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class FlinkMetricContext implements MetricsContext {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlinkMetricContext.class);
+
+ private final Map<String, Metric> metrics = new ConcurrentHashMap<>();
+
+ private final StreamingRuntimeContext runtimeContext;
+
+ public FlinkMetricContext(StreamingRuntimeContext runtimeContext) {
+ this.runtimeContext = runtimeContext;
+ }
+
+ @Override
+ public Counter counter(String name) {
+ if (metrics.containsKey(name)) {
+ return (Counter) metrics.get(name);
+ }
+ return this.counter(name, new FlinkCounter(name, runtimeContext.getLongCounter(name)));
+ }
+
+ @Override
+ public <C extends Counter> C counter(String name, C counter) {
+ this.addMetric(name, counter);
+ return counter;
+ }
+
+ @Override
+ public Meter meter(String name) {
+ if (metrics.containsKey(name)) {
+ return (Meter) metrics.get(name);
+ }
+ return this.meter(
+ name,
+ new FlinkMeter(
+ name, runtimeContext.getMetricGroup().meter(name, new MeterView(5))));
+ }
+
+ @Override
+ public <M extends Meter> M meter(String name, M meter) {
+ this.addMetric(name, meter);
+ return meter;
+ }
+
+ protected void addMetric(String name, Metric metric) {
+ if (metric == null) {
+ LOGGER.warn("Ignoring attempted add of a metric due to being null for name {}.", name);
+ } else {
+ synchronized (this) {
+ Metric prior = this.metrics.put(name, metric);
+ if (prior != null) {
+ this.metrics.put(name, prior);
+ LOGGER.warn(
+ "Name collision: MetricsContext already contains a Metric with the name '"
+ + name
+ + "'. Metric will not be reported.");
+ }
+ }
+ }
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
index a2082fd012..4a720e347b 100644
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.translation.flink.sink;
-import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -63,18 +62,22 @@ public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT>
Sink.InitContext context, List<FlinkWriterState<WriterStateT>> states)
throws IOException {
org.apache.seatunnel.api.sink.SinkWriter.Context stContext =
- new DefaultSinkWriterContext(context.getSubtaskId());
+ new FlinkSinkWriterContext(context);
if (states == null || states.isEmpty()) {
return new FlinkSinkWriter<>(
- sink.createWriter(stContext), 1, catalogTable.getSeaTunnelRowType());
+ sink.createWriter(stContext),
+ 1,
+ catalogTable.getSeaTunnelRowType(),
+ stContext.getMetricsContext());
} else {
List<WriterStateT> restoredState =
states.stream().map(FlinkWriterState::getState).collect(Collectors.toList());
return new FlinkSinkWriter<>(
sink.restoreWriter(stContext, restoredState),
states.get(0).getCheckpointId() + 1,
- catalogTable.getSeaTunnelRowType());
+ catalogTable.getSeaTunnelRowType(),
+ stContext.getMetricsContext());
}
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index 9a68f6505a..a5b9cc8fc0 100644
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -17,6 +17,10 @@
package org.apache.seatunnel.translation.flink.sink;
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.Meter;
+import org.apache.seatunnel.api.common.metrics.MetricNames;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
@@ -46,21 +50,36 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
private final org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, WriterStateT>
sinkWriter;
private final FlinkRowConverter rowSerialization;
+
+ private final Counter sinkWriteCount;
+
+ private final Counter sinkWriteBytes;
+
+ private final Meter sinkWriterQPS;
+
private long checkpointId;
FlinkSinkWriter(
org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, WriterStateT> sinkWriter,
long checkpointId,
- SeaTunnelDataType<?> dataType) {
+ SeaTunnelDataType<?> dataType,
+ MetricsContext metricsContext) {
this.sinkWriter = sinkWriter;
this.checkpointId = checkpointId;
this.rowSerialization = new FlinkRowConverter(dataType);
+ this.sinkWriteCount = metricsContext.counter(MetricNames.SINK_WRITE_COUNT);
+ this.sinkWriteBytes = metricsContext.counter(MetricNames.SINK_WRITE_BYTES);
+ this.sinkWriterQPS = metricsContext.meter(MetricNames.SINK_WRITE_QPS);
}
@Override
public void write(InputT element, SinkWriter.Context context) throws IOException {
if (element instanceof Row) {
- sinkWriter.write(rowSerialization.reconvert((Row) element));
+ SeaTunnelRow seaTunnelRow = rowSerialization.reconvert((Row) element);
+ sinkWriter.write(seaTunnelRow);
+ sinkWriteCount.inc();
+ sinkWriteBytes.inc(seaTunnelRow.getBytesSize());
+ sinkWriterQPS.markEvent();
} else {
throw new InvalidClassException(
"only support Flink Row at now, the element Class is " + element.getClass());
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterContext.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterContext.java
new file mode 100644
index 0000000000..5f5a699a5c
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterContext.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.sink;
+
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.translation.flink.metric.FlinkMetricContext;
+
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.Sink.InitContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import java.lang.reflect.Field;
+
+public class FlinkSinkWriterContext implements SinkWriter.Context {
+
+ private final Sink.InitContext writerContext;
+
+ public FlinkSinkWriterContext(InitContext writerContext) {
+ this.writerContext = writerContext;
+ }
+
+ @Override
+ public int getIndexOfSubtask() {
+ return writerContext.getSubtaskId();
+ }
+
+ @Override
+ public MetricsContext getMetricsContext() {
+ try {
+ Field contextImplField = writerContext.getClass().getDeclaredField("context");
+ contextImplField.setAccessible(true);
+ Object contextImpl = contextImplField.get(writerContext);
+ Field runtimeContextField = contextImpl.getClass().getDeclaredField("runtimeContext");
+ runtimeContextField.setAccessible(true);
+ StreamingRuntimeContext runtimeContext =
+ (StreamingRuntimeContext) runtimeContextField.get(contextImpl);
+ return new FlinkMetricContext(runtimeContext);
+ } catch (Exception e) {
+ throw new IllegalStateException("Initialize sink metrics failed", e);
+ }
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
index b680e4b030..427cab0449 100644
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
@@ -19,6 +19,10 @@ package org.apache.seatunnel.translation.flink.source;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.Meter;
+import org.apache.seatunnel.api.common.metrics.MetricNames;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -41,9 +45,19 @@ public class FlinkRowCollector implements Collector<SeaTunnelRow> {
private final FlowControlGate flowControlGate;
- public FlinkRowCollector(SeaTunnelRowType seaTunnelRowType, Config envConfig) {
+ private final Counter sourceReadCount;
+
+ private final Counter sourceReadBytes;
+
+ private final Meter sourceReadQPS;
+
+ public FlinkRowCollector(
+ SeaTunnelRowType seaTunnelRowType, Config envConfig, MetricsContext metricsContext) {
this.rowSerialization = new FlinkRowConverter(seaTunnelRowType);
this.flowControlGate = FlowControlGate.create(FlowControlStrategy.fromConfig(envConfig));
+ this.sourceReadCount = metricsContext.counter(MetricNames.SOURCE_RECEIVED_COUNT);
+ this.sourceReadBytes = metricsContext.counter(MetricNames.SOURCE_RECEIVED_BYTES);
+ this.sourceReadQPS = metricsContext.meter(MetricNames.SOURCE_RECEIVED_QPS);
}
@Override
@@ -51,6 +65,9 @@ public class FlinkRowCollector implements Collector<SeaTunnelRow> {
flowControlGate.audit(record);
try {
readerOutput.collect(rowSerialization.convert(record));
+ sourceReadCount.inc();
+ sourceReadBytes.inc(record.getBytesSize());
+ sourceReadQPS.markEvent();
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
index 16b92fb920..65dc432477 100644
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
@@ -62,7 +62,8 @@ public class FlinkSourceReader<SplitT extends SourceSplit>
SeaTunnelRowType seaTunnelRowType) {
this.sourceReader = sourceReader;
this.context = context;
- this.flinkRowCollector = new FlinkRowCollector(seaTunnelRowType, envConfig);
+ this.flinkRowCollector =
+ new FlinkRowCollector(seaTunnelRowType, envConfig, context.getMetricsContext());
}
@Override
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReaderContext.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReaderContext.java
index 704622fdec..576a35e044 100644
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReaderContext.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReaderContext.java
@@ -17,17 +17,20 @@
package org.apache.seatunnel.translation.flink.source;
-import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.translation.flink.metric.FlinkMetricContext;
import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -84,7 +87,16 @@ public class FlinkSourceReaderContext implements SourceReader.Context {
@Override
public MetricsContext getMetricsContext() {
- return new AbstractMetricsContext() {};
+ try {
+ Field field = readerContext.getClass().getDeclaredField("this$0");
+ field.setAccessible(true);
+ AbstractStreamOperator<?> operator =
+ (AbstractStreamOperator<?>) field.get(readerContext);
+ StreamingRuntimeContext runtimeContext = operator.getRuntimeContext();
+ return new FlinkMetricContext(runtimeContext);
+ } catch (Exception e) {
+ throw new IllegalStateException("Initialize source metrics failed", e);
+ }
}
public boolean isSendNoMoreElementEvent() {