You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/12/24 07:33:23 UTC

(seatunnel) branch dev updated: [Feature][Core] Support record metrics in flink engine (#6035)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a9ec9d5cc5 [Feature][Core] Support record metrics in flink engine (#6035)
a9ec9d5cc5 is described below

commit a9ec9d5cc5ac402778de080bb4b39ce315a46c3c
Author: Tyrantlucifer <Ty...@gmail.com>
AuthorDate: Sun Dec 24 15:33:17 2023 +0800

    [Feature][Core] Support record metrics in flink engine (#6035)
---
 .../seatunnel/common/utils/DateTimeUtils.java      |  16 +++
 .../seatunnel/common/utils/DateTimeUtilsTest.java  |  57 ++++++++
 .../starter/flink/execution/FlinkExecution.java    |  37 +++--
 .../e2e/connector/fake/FlinkMetricsIT.java         | 156 +++++++++++++++++++++
 .../fake_to_assert_verify_flink_metrics.conf       | 109 ++++++++++++++
 .../flink/AbstractTestFlinkContainer.java          |   3 +
 .../flink/metric/FlinkGroupCounter.java            |  73 ++++++++++
 .../flink/metric/FlinkMetricContext.java           | 120 ++++++++++++++++
 .../flink/sink/FlinkSinkWriterContext.java         |  74 ++++++++++
 .../translation/flink/metric/FlinkCounter.java     |  75 ++++++++++
 .../flink/metric/FlinkJobMetricsSummary.java       | 106 ++++++++++++++
 .../translation/flink/metric/FlinkMeter.java       |  63 +++++++++
 .../flink/metric/FlinkMetricContext.java           |  93 ++++++++++++
 .../translation/flink/sink/FlinkSink.java          |  11 +-
 .../translation/flink/sink/FlinkSinkWriter.java    |  23 ++-
 .../flink/sink/FlinkSinkWriterContext.java         |  58 ++++++++
 .../flink/source/FlinkRowCollector.java            |  19 ++-
 .../flink/source/FlinkSourceReader.java            |   3 +-
 .../flink/source/FlinkSourceReaderContext.java     |  16 ++-
 19 files changed, 1092 insertions(+), 20 deletions(-)

diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java
index bf81ed626c..885634e8a7 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java
@@ -17,7 +17,9 @@
 
 package org.apache.seatunnel.common.utils;
 
+import java.time.Instant;
 import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.HashMap;
 import java.util.Map;
@@ -49,10 +51,24 @@ public class DateTimeUtils {
         return LocalDateTime.parse(dateTime, FORMATTER_MAP.get(formatter));
     }
 
+    public static LocalDateTime parse(long timestamp) {
+        return parse(timestamp, ZoneId.systemDefault());
+    }
+
+    public static LocalDateTime parse(long timestamp, ZoneId zoneId) {
+        Instant instant = Instant.ofEpochMilli(timestamp);
+        return LocalDateTime.ofInstant(instant, zoneId);
+    }
+
     public static String toString(LocalDateTime dateTime, Formatter formatter) {
         return dateTime.format(FORMATTER_MAP.get(formatter));
     }
 
+    public static String toString(long timestamp, Formatter formatter) {
+        Instant instant = Instant.ofEpochMilli(timestamp);
+        return toString(LocalDateTime.ofInstant(instant, ZoneId.systemDefault()), formatter);
+    }
+
     public enum Formatter {
         YYYY_MM_DD_HH_MM_SS("yyyy-MM-dd HH:mm:ss"),
         YYYY_MM_DD_HH_MM_SS_SPOT("yyyy.MM.dd HH:mm:ss"),
diff --git a/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/DateTimeUtilsTest.java b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/DateTimeUtilsTest.java
new file mode 100644
index 0000000000..ef1f971ce5
--- /dev/null
+++ b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/DateTimeUtilsTest.java
@@ -0,0 +1,57 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+import org.apache.seatunnel.common.utils.DateTimeUtils.Formatter;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+
+public class DateTimeUtilsTest {
+
+    @Test
+    public void testParseDateString() {
+        final String datetime = "2023-12-22 00:00:00";
+        LocalDateTime parse = DateTimeUtils.parse(datetime, Formatter.YYYY_MM_DD_HH_MM_SS);
+        Assertions.assertEquals(0, parse.getMinute());
+        Assertions.assertEquals(0, parse.getHour());
+        Assertions.assertEquals(0, parse.getSecond());
+        Assertions.assertEquals(22, parse.getDayOfMonth());
+        Assertions.assertEquals(12, parse.getMonth().getValue());
+        Assertions.assertEquals(2023, parse.getYear());
+        Assertions.assertEquals(22, parse.getDayOfMonth());
+    }
+
+    @Test
+    public void testParseTimestamp() {
+        // 2023-12-22 12:55:20
+        final long timestamp = 1703220920013L;
+        LocalDateTime parse = DateTimeUtils.parse(timestamp, ZoneId.of("Asia/Shanghai"));
+
+        Assertions.assertEquals(55, parse.getMinute());
+        Assertions.assertEquals(12, parse.getHour());
+        Assertions.assertEquals(20, parse.getSecond());
+        Assertions.assertEquals(22, parse.getDayOfMonth());
+        Assertions.assertEquals(12, parse.getMonth().getValue());
+        Assertions.assertEquals(2023, parse.getYear());
+        Assertions.assertEquals(22, parse.getDayOfMonth());
+    }
+}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index 39671af080..6b3fbd9b47 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -33,10 +33,13 @@ import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
 import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
 import org.apache.seatunnel.core.starter.execution.TaskExecution;
 import org.apache.seatunnel.core.starter.flink.FlinkStarter;
+import org.apache.seatunnel.translation.flink.metric.FlinkJobMetricsSummary;
 
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 
-import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.net.MalformedURLException;
@@ -51,8 +54,10 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /** Used to execute a SeaTunnelTask. */
-@Slf4j
 public class FlinkExecution implements TaskExecution {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkExecution.class);
+
     private final FlinkRuntimeEnvironment flinkRuntimeEnvironment;
     private final PluginExecuteProcessor<DataStreamTableInfo, FlinkRuntimeEnvironment>
             sourcePluginExecuteProcessor;
@@ -109,20 +114,32 @@ public class FlinkExecution implements TaskExecution {
         dataStreams = sourcePluginExecuteProcessor.execute(dataStreams);
         dataStreams = transformPluginExecuteProcessor.execute(dataStreams);
         sinkPluginExecuteProcessor.execute(dataStreams);
-        log.info(
+        LOGGER.info(
                 "Flink Execution Plan: {}",
                 flinkRuntimeEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
-        log.info("Flink job name: {}", flinkRuntimeEnvironment.getJobName());
+        LOGGER.info("Flink job name: {}", flinkRuntimeEnvironment.getJobName());
         if (!flinkRuntimeEnvironment.isStreaming()) {
             flinkRuntimeEnvironment
                     .getStreamExecutionEnvironment()
                     .setRuntimeMode(RuntimeExecutionMode.BATCH);
-            log.info("Flink job Mode: {}", JobMode.BATCH);
+            LOGGER.info("Flink job Mode: {}", JobMode.BATCH);
         }
         try {
-            flinkRuntimeEnvironment
-                    .getStreamExecutionEnvironment()
-                    .execute(flinkRuntimeEnvironment.getJobName());
+            final long jobStartTime = System.currentTimeMillis();
+            JobExecutionResult jobResult =
+                    flinkRuntimeEnvironment
+                            .getStreamExecutionEnvironment()
+                            .execute(flinkRuntimeEnvironment.getJobName());
+            final long jobEndTime = System.currentTimeMillis();
+
+            final FlinkJobMetricsSummary jobMetricsSummary =
+                    FlinkJobMetricsSummary.builder()
+                            .jobExecutionResult(jobResult)
+                            .jobStartTime(jobStartTime)
+                            .jobEndTime(jobEndTime)
+                            .build();
+
+            LOGGER.info("Job finished, execution result: \n{}", jobMetricsSummary);
         } catch (Exception e) {
             throw new TaskExecuteException("Execute Flink job error", e);
         }
@@ -170,9 +187,9 @@ public class FlinkExecution implements TaskExecution {
         for (URL jarUrl : jars) {
             if (new File(jarUrl.getFile()).exists()) {
                 validJars.add(jarUrl);
-                log.info("Inject jar to config: {}", jarUrl);
+                LOGGER.info("Inject jar to config: {}", jarUrl);
             } else {
-                log.warn("Remove invalid jar when inject jars into config: {}", jarUrl);
+                LOGGER.warn("Remove invalid jar when inject jars into config: {}", jarUrl);
             }
         }
 
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FlinkMetricsIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FlinkMetricsIT.java
new file mode 100644
index 0000000000..abd059a8b8
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fake/FlinkMetricsIT.java
@@ -0,0 +1,156 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.fake;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.seatunnel.api.common.metrics.MetricNames;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.flink.Flink13Container;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.SEATUNNEL})
+public class FlinkMetricsIT extends TestSuiteBase {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkMetricsIT.class);
+
+    @TestTemplate
+    public void testFlinkMetrics(TestContainer container) throws IOException, InterruptedException {
+        Container.ExecResult executeResult =
+                container.executeJob("/fake_to_assert_verify_flink_metrics.conf");
+        Assertions.assertEquals(0, executeResult.getExitCode());
+        final String jobListUrl = "http://%s:8081/jobs/overview";
+        final String jobDetailsUrl = "http://%s:8081/jobs/%s";
+        final String jobAccumulatorUrl = "http://%s:8081/jobs/%s/vertices/%s/accumulators";
+        final String jobManagerHost;
+        String dockerHost = System.getenv("DOCKER_HOST");
+        if (dockerHost == null) {
+            jobManagerHost = "localhost";
+        } else {
+            URI uri = URI.create(dockerHost);
+            jobManagerHost = uri.getHost();
+        }
+        // create http client
+        CloseableHttpClient httpClient = HttpClients.createDefault();
+
+        // get job id
+        HttpGet httpGet = new HttpGet(String.format(jobListUrl, jobManagerHost));
+        CloseableHttpResponse response = httpClient.execute(httpGet);
+        Assertions.assertEquals(response.getStatusLine().getStatusCode(), 200);
+        String responseContent = EntityUtils.toString(response.getEntity());
+        ObjectNode jsonNode = JsonUtils.parseObject(responseContent);
+        String jobId = jsonNode.get("jobs").get(0).get("jid").asText();
+        Assertions.assertNotNull(jobId);
+
+        // get job vertices
+        httpGet = new HttpGet(String.format(jobDetailsUrl, jobManagerHost, jobId));
+        response = httpClient.execute(httpGet);
+        Assertions.assertEquals(response.getStatusLine().getStatusCode(), 200);
+
+        responseContent = EntityUtils.toString(response.getEntity());
+        jsonNode = JsonUtils.parseObject(responseContent);
+        String verticeId = jsonNode.get("vertices").get(0).get("id").asText();
+
+        Awaitility.given()
+                .ignoreExceptions()
+                .await()
+                .atMost(10L, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () -> {
+                            HttpGet httpGetTemp =
+                                    new HttpGet(
+                                            String.format(
+                                                    jobAccumulatorUrl,
+                                                    jobManagerHost,
+                                                    jobId,
+                                                    verticeId));
+                            CloseableHttpResponse responseTemp = httpClient.execute(httpGetTemp);
+                            String responseContentTemp =
+                                    EntityUtils.toString(responseTemp.getEntity());
+                            JsonNode jsonNodeTemp = JsonUtils.parseObject(responseContentTemp);
+                            JsonNode metrics = jsonNodeTemp.get("user-accumulators");
+                            int size = metrics.size();
+                            if (size <= 0) {
+                                throw new IllegalStateException(
+                                        "Flink metrics not synchronized yet, next round");
+                            }
+                        });
+
+        // get metrics
+        httpGet = new HttpGet(String.format(jobAccumulatorUrl, jobManagerHost, jobId, verticeId));
+        response = httpClient.execute(httpGet);
+        responseContent = EntityUtils.toString(response.getEntity());
+        jsonNode = JsonUtils.parseObject(responseContent);
+        JsonNode metrics = jsonNode.get("user-accumulators");
+
+        int size = metrics.size();
+
+        Assertions.assertTrue(size > 0);
+
+        Map<String, String> metricsMap = new HashMap<>();
+
+        for (JsonNode metric : metrics) {
+            String name = metric.get("name").asText();
+            String value = metric.get("value").asText();
+            metricsMap.put(name, value);
+        }
+
+        String sourceReceivedCount = metricsMap.get(MetricNames.SOURCE_RECEIVED_COUNT);
+        String sourceReceivedBytes = metricsMap.get(MetricNames.SOURCE_RECEIVED_BYTES);
+
+        Assertions.assertEquals(5, Integer.valueOf(sourceReceivedCount));
+        Assertions.assertEquals(2160, Integer.valueOf(sourceReceivedBytes));
+
+        // Due to limitations in Flink 13 version and code, the metrics on the writer side cannot be
+        // aggregated into the global accumulator and can only be viewed in the operator based on
+        // parallelism dimensions
+        if (!(container instanceof Flink13Container)) {
+            String sinkWriteCount = metricsMap.get(MetricNames.SINK_WRITE_COUNT);
+            String sinkWriteBytes = metricsMap.get(MetricNames.SINK_WRITE_BYTES);
+            Assertions.assertEquals(5, Integer.valueOf(sinkWriteCount));
+            Assertions.assertEquals(2160, Integer.valueOf(sinkWriteBytes));
+        }
+
+        httpClient.close();
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_verify_flink_metrics.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_verify_flink_metrics.conf
new file mode 100644
index 0000000000..c4e1798b48
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fake-e2e/src/test/resources/fake_to_assert_verify_flink_metrics.conf
@@ -0,0 +1,109 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    row.num = 5
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
index 9a0b353785..82e8e79c11 100644
--- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
@@ -28,6 +28,7 @@ import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
 import org.testcontainers.lifecycle.Startables;
 import org.testcontainers.utility.DockerLoggerFactory;
 
+import com.google.common.collect.Lists;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
@@ -85,6 +86,8 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer {
         copySeaTunnelStarterToContainer(jobManager);
         copySeaTunnelStarterLoggingToContainer(jobManager);
 
+        jobManager.setPortBindings(Lists.newArrayList(String.format("%s:%s", 8081, 8081)));
+
         taskManager =
                 new GenericContainer<>(dockerImage)
                         .withCommand("taskmanager")
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkGroupCounter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkGroupCounter.java
new file mode 100644
index 0000000000..bcda819ee9
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkGroupCounter.java
@@ -0,0 +1,73 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.metric;
+
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.Unit;
+
+public class FlinkGroupCounter implements Counter {
+
+    private final String name;
+
+    private final org.apache.flink.metrics.Counter counter;
+
+    public FlinkGroupCounter(String name, org.apache.flink.metrics.Counter counter) {
+        this.name = name;
+        this.counter = counter;
+    }
+
+    @Override
+    public void inc() {
+        counter.inc();
+    }
+
+    @Override
+    public void inc(long n) {
+        counter.inc(n);
+    }
+
+    @Override
+    public void dec() {
+        throw new UnsupportedOperationException("Flink metrics does not support dec operation");
+    }
+
+    @Override
+    public void dec(long n) {
+        throw new UnsupportedOperationException("Flink metrics does not support dec operation");
+    }
+
+    @Override
+    public void set(long n) {
+        throw new UnsupportedOperationException("Flink metrics does not support set operation");
+    }
+
+    @Override
+    public long getCount() {
+        return counter.getCount();
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public Unit unit() {
+        return Unit.COUNT;
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkMetricContext.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkMetricContext.java
new file mode 100644
index 0000000000..00316eb7de
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkMetricContext.java
@@ -0,0 +1,120 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.metric;
+
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.Meter;
+import org.apache.seatunnel.api.common.metrics.Metric;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+
+import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class FlinkMetricContext implements MetricsContext {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkMetricContext.class);
+
+    private final Map<String, Metric> metrics = new ConcurrentHashMap<>();
+
+    private MetricGroup metricGroup;
+
+    private StreamingRuntimeContext runtimeContext;
+
+    public FlinkMetricContext(MetricGroup metricGroup) {
+        this.metricGroup = metricGroup;
+    }
+
+    public FlinkMetricContext(StreamingRuntimeContext runtimeContext) {
+        this.runtimeContext = runtimeContext;
+    }
+
+    @Override
+    public Counter counter(String name) {
+        if (metrics.containsKey(name)) {
+            return (Counter) metrics.get(name);
+        }
+        Counter counter =
+                runtimeContext == null
+                        ? new FlinkGroupCounter(name, metricGroup.counter(name))
+                        : new FlinkCounter(name, runtimeContext.getLongCounter(name));
+        return this.counter(name, counter);
+    }
+
+    @Override
+    public <C extends Counter> C counter(String name, C counter) {
+        this.addMetric(name, counter);
+        return counter;
+    }
+
+    @Override
+    public Meter meter(String name) {
+        if (metrics.containsKey(name)) {
+            return (Meter) metrics.get(name);
+        }
+
+        // Why use reflection to obtain metrics group?
+        // Because the value types returned by flink 1.13 and 1.14 runtimeContext.getMetricGroup()
+        // are inconsistent
+        org.apache.flink.metrics.Meter meter;
+        if (runtimeContext == null) {
+            meter = metricGroup.meter(name, new MeterView(5));
+        } else {
+            try {
+                Field field = AbstractRuntimeUDFContext.class.getDeclaredField("metrics");
+                field.setAccessible(true);
+                MetricGroup mg = (MetricGroup) field.get(runtimeContext);
+                meter = mg.meter(name, new MeterView(5));
+            } catch (Exception e) {
+                throw new IllegalStateException("Initial meter failed", e);
+            }
+        }
+        return this.meter(name, new FlinkMeter(name, meter));
+    }
+
+    @Override
+    public <M extends Meter> M meter(String name, M meter) {
+        this.addMetric(name, meter);
+        return meter;
+    }
+
+    protected void addMetric(String name, Metric metric) {
+        if (metric == null) {
+            LOGGER.warn("Ignoring attempted add of a metric due to being null for name {}.", name);
+            return;
+        }
+        synchronized (this) {
+            Metric prior = this.metrics.put(name, metric);
+            if (prior != null) {
+                this.metrics.put(name, prior);
+                LOGGER.warn(
+                        "Name collision: MetricsContext already contains a Metric with the name '"
+                                + name
+                                + "'. Metric will not be reported.");
+            }
+        }
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterContext.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterContext.java
new file mode 100644
index 0000000000..95f72e8ed5
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterContext.java
@@ -0,0 +1,74 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.sink;
+
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.translation.flink.metric.FlinkMetricContext;
+
+import org.apache.flink.api.connector.sink.Sink.InitContext;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+
+public class FlinkSinkWriterContext implements SinkWriter.Context {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkMetricContext.class);
+
+    private final InitContext writerContext;
+
+    public FlinkSinkWriterContext(InitContext writerContext) {
+        this.writerContext = writerContext;
+    }
+
+    @Override
+    public int getIndexOfSubtask() {
+        return writerContext.getSubtaskId();
+    }
+
+    @Override
+    public MetricsContext getMetricsContext() {
+        // In flink 1.14, it has contained runtimeContext in InitContext, so first step to detect if
+        // it is existed
+        try {
+            Field field = writerContext.getClass().getDeclaredField("runtimeContext");
+            field.setAccessible(true);
+            StreamingRuntimeContext runtimeContext =
+                    (StreamingRuntimeContext) field.get(writerContext);
+            return new FlinkMetricContext(runtimeContext);
+        } catch (Exception e) {
+            LOGGER.info(
+                    "Flink version is not 1.14.x, will initial MetricsContext using metricGroup");
+        }
+        // Why use reflection to obtain metrics group?
+        // Because the value types returned by flink 1.13 and 1.14 InitContext.getMetricGroup()
+        // are inconsistent
+        try {
+            Field field = writerContext.getClass().getDeclaredField("metricGroup");
+            field.setAccessible(true);
+            MetricGroup metricGroup = (MetricGroup) field.get(writerContext);
+            return new FlinkMetricContext(metricGroup);
+        } catch (Exception e) {
+            throw new IllegalStateException("Initial sink metrics failed", e);
+        }
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkCounter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkCounter.java
new file mode 100644
index 0000000000..296f46cb56
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkCounter.java
@@ -0,0 +1,75 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.metric;
+
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.Unit;
+
+import org.apache.flink.api.common.accumulators.LongCounter;
+
+public class FlinkCounter implements Counter {
+
+    private final String name;
+
+    private final LongCounter longCounter;
+
+    public FlinkCounter(String name, LongCounter longCounter) {
+        this.name = name;
+        this.longCounter = longCounter;
+    }
+
+    @Override
+    public void inc() {
+        inc(1L);
+    }
+
+    @Override
+    public void inc(long n) {
+        longCounter.add(n);
+    }
+
+    @Override
+    public void dec() {
+        throw new UnsupportedOperationException("Flink metrics does not support dec operation");
+    }
+
+    @Override
+    public void dec(long n) {
+        throw new UnsupportedOperationException("Flink metrics does not support dec operation");
+    }
+
+    @Override
+    public void set(long n) {
+        longCounter.add(n);
+    }
+
+    @Override
+    public long getCount() {
+        return longCounter.getLocalValue();
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public Unit unit() {
+        return Unit.COUNT;
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkJobMetricsSummary.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkJobMetricsSummary.java
new file mode 100644
index 0000000000..556f028ecf
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkJobMetricsSummary.java
@@ -0,0 +1,106 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.metric;
+
+import org.apache.seatunnel.api.common.metrics.MetricNames;
+import org.apache.seatunnel.common.utils.DateTimeUtils;
+import org.apache.seatunnel.common.utils.StringFormatUtils;
+
+import org.apache.flink.api.common.JobExecutionResult;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+
+public final class FlinkJobMetricsSummary {
+
+    private final JobExecutionResult jobExecutionResult;
+
+    private final LocalDateTime jobStartTime;
+
+    private final LocalDateTime jobEndTime;
+
+    FlinkJobMetricsSummary(
+            JobExecutionResult jobExecutionResult,
+            LocalDateTime jobStartTime,
+            LocalDateTime jobEndTime) {
+        this.jobExecutionResult = jobExecutionResult;
+        this.jobStartTime = jobStartTime;
+        this.jobEndTime = jobEndTime;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+
+        private JobExecutionResult jobExecutionResult;
+
+        private long jobStartTime;
+
+        private long jobEndTime;
+
+        private Builder() {}
+
+        public Builder jobExecutionResult(JobExecutionResult jobExecutionResult) {
+            this.jobExecutionResult = jobExecutionResult;
+            return this;
+        }
+
+        public Builder jobStartTime(long jobStartTime) {
+            this.jobStartTime = jobStartTime;
+            return this;
+        }
+
+        public Builder jobEndTime(long jobEndTime) {
+            this.jobEndTime = jobEndTime;
+            return this;
+        }
+
+        public FlinkJobMetricsSummary build() {
+            return new FlinkJobMetricsSummary(
+                    jobExecutionResult,
+                    DateTimeUtils.parse(jobStartTime),
+                    DateTimeUtils.parse(jobEndTime));
+        }
+    }
+
+    @Override
+    public String toString() {
+        return StringFormatUtils.formatTable(
+                "Job Statistic Information",
+                "Start Time",
+                DateTimeUtils.toString(jobStartTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
+                "End Time",
+                DateTimeUtils.toString(jobEndTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
+                "Total Time(s)",
+                Duration.between(jobStartTime, jobEndTime).getSeconds(),
+                "Total Read Count",
+                jobExecutionResult
+                        .getAllAccumulatorResults()
+                        .get(MetricNames.SOURCE_RECEIVED_COUNT),
+                "Total Write Count",
+                jobExecutionResult.getAllAccumulatorResults().get(MetricNames.SINK_WRITE_COUNT),
+                "Total Read Bytes",
+                jobExecutionResult
+                        .getAllAccumulatorResults()
+                        .get(MetricNames.SOURCE_RECEIVED_BYTES),
+                "Total Write Bytes",
+                jobExecutionResult.getAllAccumulatorResults().get(MetricNames.SINK_WRITE_BYTES));
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkMeter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkMeter.java
new file mode 100644
index 0000000000..82b8de0fe6
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkMeter.java
@@ -0,0 +1,63 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.metric;
+
+import org.apache.seatunnel.api.common.metrics.Meter;
+import org.apache.seatunnel.api.common.metrics.Unit;
+
+public class FlinkMeter implements Meter {
+
+    private final String name;
+
+    private final org.apache.flink.metrics.Meter meter;
+
+    public FlinkMeter(String name, org.apache.flink.metrics.Meter meter) {
+        this.name = name;
+        this.meter = meter;
+    }
+
+    @Override
+    public void markEvent() {
+        meter.markEvent();
+    }
+
+    @Override
+    public void markEvent(long n) {
+        meter.markEvent(n);
+    }
+
+    @Override
+    public double getRate() {
+        return meter.getRate();
+    }
+
+    @Override
+    public long getCount() {
+        return meter.getCount();
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public Unit unit() {
+        return Unit.COUNT;
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkMetricContext.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkMetricContext.java
new file mode 100644
index 0000000000..3f32e641cc
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/metric/FlinkMetricContext.java
@@ -0,0 +1,93 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.metric;
+
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.Meter;
+import org.apache.seatunnel.api.common.metrics.Metric;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class FlinkMetricContext implements MetricsContext {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkMetricContext.class);
+
+    private final Map<String, Metric> metrics = new ConcurrentHashMap<>();
+
+    private final StreamingRuntimeContext runtimeContext;
+
+    public FlinkMetricContext(StreamingRuntimeContext runtimeContext) {
+        this.runtimeContext = runtimeContext;
+    }
+
+    @Override
+    public Counter counter(String name) {
+        if (metrics.containsKey(name)) {
+            return (Counter) metrics.get(name);
+        }
+        return this.counter(name, new FlinkCounter(name, runtimeContext.getLongCounter(name)));
+    }
+
+    @Override
+    public <C extends Counter> C counter(String name, C counter) {
+        this.addMetric(name, counter);
+        return counter;
+    }
+
+    @Override
+    public Meter meter(String name) {
+        if (metrics.containsKey(name)) {
+            return (Meter) metrics.get(name);
+        }
+        return this.meter(
+                name,
+                new FlinkMeter(
+                        name, runtimeContext.getMetricGroup().meter(name, new MeterView(5))));
+    }
+
+    @Override
+    public <M extends Meter> M meter(String name, M meter) {
+        this.addMetric(name, meter);
+        return meter;
+    }
+
+    protected void addMetric(String name, Metric metric) {
+        if (metric == null) {
+            LOGGER.warn("Ignoring attempted add of a metric due to being null for name {}.", name);
+        } else {
+            synchronized (this) {
+                Metric prior = this.metrics.put(name, metric);
+                if (prior != null) {
+                    this.metrics.put(name, prior);
+                    LOGGER.warn(
+                            "Name collision: MetricsContext already contains a Metric with the name '"
+                                    + name
+                                    + "'. Metric will not be reported.");
+                }
+            }
+        }
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
index a2082fd012..4a720e347b 100644
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSink.java
@@ -17,7 +17,6 @@
 
 package org.apache.seatunnel.translation.flink.sink;
 
-import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -63,18 +62,22 @@ public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT>
             Sink.InitContext context, List<FlinkWriterState<WriterStateT>> states)
             throws IOException {
         org.apache.seatunnel.api.sink.SinkWriter.Context stContext =
-                new DefaultSinkWriterContext(context.getSubtaskId());
+                new FlinkSinkWriterContext(context);
 
         if (states == null || states.isEmpty()) {
             return new FlinkSinkWriter<>(
-                    sink.createWriter(stContext), 1, catalogTable.getSeaTunnelRowType());
+                    sink.createWriter(stContext),
+                    1,
+                    catalogTable.getSeaTunnelRowType(),
+                    stContext.getMetricsContext());
         } else {
             List<WriterStateT> restoredState =
                     states.stream().map(FlinkWriterState::getState).collect(Collectors.toList());
             return new FlinkSinkWriter<>(
                     sink.restoreWriter(stContext, restoredState),
                     states.get(0).getCheckpointId() + 1,
-                    catalogTable.getSeaTunnelRowType());
+                    catalogTable.getSeaTunnelRowType(),
+                    stContext.getMetricsContext());
         }
     }
 
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index 9a68f6505a..a5b9cc8fc0 100644
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -17,6 +17,10 @@
 
 package org.apache.seatunnel.translation.flink.sink;
 
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.Meter;
+import org.apache.seatunnel.api.common.metrics.MetricNames;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
@@ -46,21 +50,36 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
     private final org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, WriterStateT>
             sinkWriter;
     private final FlinkRowConverter rowSerialization;
+
+    private final Counter sinkWriteCount;
+
+    private final Counter sinkWriteBytes;
+
+    private final Meter sinkWriterQPS;
+
     private long checkpointId;
 
     FlinkSinkWriter(
             org.apache.seatunnel.api.sink.SinkWriter<SeaTunnelRow, CommT, WriterStateT> sinkWriter,
             long checkpointId,
-            SeaTunnelDataType<?> dataType) {
+            SeaTunnelDataType<?> dataType,
+            MetricsContext metricsContext) {
         this.sinkWriter = sinkWriter;
         this.checkpointId = checkpointId;
         this.rowSerialization = new FlinkRowConverter(dataType);
+        this.sinkWriteCount = metricsContext.counter(MetricNames.SINK_WRITE_COUNT);
+        this.sinkWriteBytes = metricsContext.counter(MetricNames.SINK_WRITE_BYTES);
+        this.sinkWriterQPS = metricsContext.meter(MetricNames.SINK_WRITE_QPS);
     }
 
     @Override
     public void write(InputT element, SinkWriter.Context context) throws IOException {
         if (element instanceof Row) {
-            sinkWriter.write(rowSerialization.reconvert((Row) element));
+            SeaTunnelRow seaTunnelRow = rowSerialization.reconvert((Row) element);
+            sinkWriter.write(seaTunnelRow);
+            sinkWriteCount.inc();
+            sinkWriteBytes.inc(seaTunnelRow.getBytesSize());
+            sinkWriterQPS.markEvent();
         } else {
             throw new InvalidClassException(
                     "only support Flink Row at now, the element Class is " + element.getClass());
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterContext.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterContext.java
new file mode 100644
index 0000000000..5f5a699a5c
--- /dev/null
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriterContext.java
@@ -0,0 +1,58 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.seatunnel.translation.flink.sink;
+
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.translation.flink.metric.FlinkMetricContext;
+
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.Sink.InitContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import java.lang.reflect.Field;
+
+public class FlinkSinkWriterContext implements SinkWriter.Context {
+
+    private final Sink.InitContext writerContext;
+
+    public FlinkSinkWriterContext(InitContext writerContext) {
+        this.writerContext = writerContext;
+    }
+
+    @Override
+    public int getIndexOfSubtask() {
+        return writerContext.getSubtaskId();
+    }
+
+    @Override
+    public MetricsContext getMetricsContext() {
+        try {
+            Field contextImplField = writerContext.getClass().getDeclaredField("context");
+            contextImplField.setAccessible(true);
+            Object contextImpl = contextImplField.get(writerContext);
+            Field runtimeContextField = contextImpl.getClass().getDeclaredField("runtimeContext");
+            runtimeContextField.setAccessible(true);
+            StreamingRuntimeContext runtimeContext =
+                    (StreamingRuntimeContext) runtimeContextField.get(contextImpl);
+            return new FlinkMetricContext(runtimeContext);
+        } catch (Exception e) {
+            throw new IllegalStateException("Initialize sink metrics failed", e);
+        }
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
index b680e4b030..427cab0449 100644
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
@@ -19,6 +19,10 @@ package org.apache.seatunnel.translation.flink.source;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.Meter;
+import org.apache.seatunnel.api.common.metrics.MetricNames;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -41,9 +45,19 @@ public class FlinkRowCollector implements Collector<SeaTunnelRow> {
 
     private final FlowControlGate flowControlGate;
 
-    public FlinkRowCollector(SeaTunnelRowType seaTunnelRowType, Config envConfig) {
+    private final Counter sourceReadCount;
+
+    private final Counter sourceReadBytes;
+
+    private final Meter sourceReadQPS;
+
+    public FlinkRowCollector(
+            SeaTunnelRowType seaTunnelRowType, Config envConfig, MetricsContext metricsContext) {
         this.rowSerialization = new FlinkRowConverter(seaTunnelRowType);
         this.flowControlGate = FlowControlGate.create(FlowControlStrategy.fromConfig(envConfig));
+        this.sourceReadCount = metricsContext.counter(MetricNames.SOURCE_RECEIVED_COUNT);
+        this.sourceReadBytes = metricsContext.counter(MetricNames.SOURCE_RECEIVED_BYTES);
+        this.sourceReadQPS = metricsContext.meter(MetricNames.SOURCE_RECEIVED_QPS);
     }
 
     @Override
@@ -51,6 +65,9 @@ public class FlinkRowCollector implements Collector<SeaTunnelRow> {
         flowControlGate.audit(record);
         try {
             readerOutput.collect(rowSerialization.convert(record));
+            sourceReadCount.inc();
+            sourceReadBytes.inc(record.getBytesSize());
+            sourceReadQPS.markEvent();
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
index 16b92fb920..65dc432477 100644
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
@@ -62,7 +62,8 @@ public class FlinkSourceReader<SplitT extends SourceSplit>
             SeaTunnelRowType seaTunnelRowType) {
         this.sourceReader = sourceReader;
         this.context = context;
-        this.flinkRowCollector = new FlinkRowCollector(seaTunnelRowType, envConfig);
+        this.flinkRowCollector =
+                new FlinkRowCollector(seaTunnelRowType, envConfig, context.getMetricsContext());
     }
 
     @Override
diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReaderContext.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReaderContext.java
index 704622fdec..576a35e044 100644
--- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReaderContext.java
+++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReaderContext.java
@@ -17,17 +17,20 @@
 
 package org.apache.seatunnel.translation.flink.source;
 
-import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext;
 import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.translation.flink.metric.FlinkMetricContext;
 
 import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Field;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -84,7 +87,16 @@ public class FlinkSourceReaderContext implements SourceReader.Context {
 
     @Override
     public MetricsContext getMetricsContext() {
-        return new AbstractMetricsContext() {};
+        try {
+            Field field = readerContext.getClass().getDeclaredField("this$0");
+            field.setAccessible(true);
+            AbstractStreamOperator<?> operator =
+                    (AbstractStreamOperator<?>) field.get(readerContext);
+            StreamingRuntimeContext runtimeContext = operator.getRuntimeContext();
+            return new FlinkMetricContext(runtimeContext);
+        } catch (Exception e) {
+            throw new IllegalStateException("Initialize source metrics failed", e);
+        }
     }
 
     public boolean isSendNoMoreElementEvent() {