You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/10/27 03:47:55 UTC

[incubator-seatunnel] branch st-metrics updated: [Core][Metrics] Add Seatunnel Metrics module (#2888)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-metrics
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-metrics by this push:
     new 0567e2af2 [Core][Metrics] Add Seatunnel Metrics module (#2888)
0567e2af2 is described below

commit 0567e2af27461bed76b1658da290225efcefbafd
Author: lvlv <40...@users.noreply.github.com>
AuthorDate: Thu Oct 27 11:47:50 2022 +0800

    [Core][Metrics] Add Seatunnel Metrics module (#2888)
    
    * seatunnel-metrics init commit
    
    * seatunnel-metrics config add
    
    * codeStyle update
    
    * codeStyle update again
    
    * codeStyle seatunnel-spark update
    
    * [Imporve][Connector-V2]Parameter verification for connector V2 kafka sink (#2866)
    
    * parameter verification
    
    * update
    
    * update
    
    * [Improve][DOC] Perfect the connector v2 doc (#2800)
    
    * [Improve][DOC] Perfect the connector v2 doc
    
    * Update seatunnel-connectors-v2/README.zh.md
    
    Co-authored-by: Hisoka <fa...@qq.com>
    
    * [Improve][DOC] A little tinkering
    
    * [Improve][DOC] A little tinkering
    
    * [Doc][connector] add Console sink doc
    
    close #2794
    
    * [Doc][connector] add Console sink doc
    
    close #2794
    
    * fix some problem
    
    * fix some problem
    
    * fine tuning
    
    Co-authored-by: Hisoka <fa...@qq.com>
    
    * add seatunnel-examples from gitignore (#2892)
    
    * [Improve][connector-jdbc] Calculate splits only once in JdbcSourceSplitEnumerator (#2900)
    
    * [Bug][Connector-V2] Fix wechat sink data serialization (#2856)
    
    * [Improve][Connector-V2] Improve orc write strategy to support all data types (#2860)
    
    * [Improve][Connector-V2] Improve orc write strategy to support all data types
    
    Co-authored-by: tyrantlucifer <ty...@gmail.com>
    
    * [Bug][seatunnel-translation-base] Fix Source restore state NPE (#2878)
    
    * [Improve][Connector-v2-Fake]Supports direct definition of data values(row) (#2839)
    
    * [Improve][Connector-v2]Supports direct definition of data values(row)
    
    * seatunnel-prometheus update
    
    * seatunnel-prometheus update
    
    * seatunnel-prometheus update
    
    * 1. Seatunnel unified configuration naming
    2. Use reflection to automate assembly
    3. Modify the flink/spark startup function
    4. Try packaging configuration (todo)
    
    Co-authored-by: TaoZex <45...@users.noreply.github.com>
    Co-authored-by: liugddx <80...@qq.com>
    Co-authored-by: Hisoka <fa...@qq.com>
    Co-authored-by: Eric <ga...@gmail.com>
    Co-authored-by: Xiao Zhao <zh...@163.com>
    Co-authored-by: hailin0 <wa...@apache.org>
    Co-authored-by: tyrantlucifer <ty...@gmail.com>
    Co-authored-by: Laglangyue <35...@users.noreply.github.com>
---
 .gitignore                                         |   2 +
 docs/en/connector-v2/source/FakeSource.md          |   8 +
 pom.xml                                            |   1 +
 seatunnel-api/pom.xml                              |   7 +-
 .../apache/seatunnel/flink/FlinkEnvironment.java   |  96 +++++++-
 .../apache/seatunnel/flink/util/ConfigKeyName.java |   5 +
 .../apache/seatunnel/spark/SparkEnvironment.java   |  22 ++
 .../common/constants/CollectionConstants.java      |   5 +
 seatunnel-connectors-v2/README.md                  | 170 +++++++++++----
 seatunnel-connectors-v2/README.zh.md               |  97 +++++++--
 .../seatunnel/fake/source/FakeOptions.java         |  23 +-
 .../seatunnel/fake/source/FakeSource.java          |   4 +-
 .../seatunnel/fake/source/FakeSourceReader.java    |   6 +-
 .../file/sink/writer/OrcWriteStrategy.java         | 193 ++++++++++++++---
 .../seatunnel/http/sink/HttpSinkWriter.java        |   8 +-
 .../sink/WeChatBotMessageSerializationSchema.java  |  71 ++++++
 .../wechat/sink/WeChatHttpSinkWriter.java          |  70 ------
 .../seatunnel/wechat/sink/WeChatSink.java          |  25 +--
 .../jdbc/source/JdbcSourceSplitEnumerator.java     |  43 ++--
 .../connectors/seatunnel/kafka/sink/KafkaSink.java |  10 +
 .../seatunnel/kafka/sink/KafkaSinkWriter.java      |   3 +-
 seatunnel-core/seatunnel-flink-starter/pom.xml     |   6 +
 seatunnel-core/seatunnel-spark-starter/pom.xml     |   6 +
 .../src/main/assembly/assembly-bin-ci.xml          |  12 +
 seatunnel-dist/src/main/assembly/assembly-bin.xml  |  12 +
 .../resources/assertion/fakesource_to_assert.conf  |   1 +
 .../e2e/spark/v2/file/FakeSourceToFileIT.java      |   8 +
 .../resources/file/fakesource_to_local_orc.conf    |  56 ++---
 .../file/local_orc_source_to_console.conf          |  61 ++++--
 .../main/resources/examples/fake_to_console.conf   |  11 +-
 .../seatunnel-flink-examples/pom.xml               |   7 +-
 .../main/resources/examples/fake_to_console.conf   |   8 +-
 .../src/main/resources/examples/spark.batch.conf   |   5 +
 .../seatunnel-spark-examples/pom.xml               |   7 +-
 .../src/main/resources/examples/spark.batch.conf   |   5 +
 seatunnel-metrics/pom.xml                          |  21 ++
 .../seatunnel-metrics-console/pom.xml              |  23 ++
 .../metrics/console/ConsoleLogReporter.java        | 115 ++++++++++
 seatunnel-metrics/seatunnel-metrics-core/pom.xml   |  21 ++
 .../org/apache/seatunnel/metrics/core/Counter.java |  18 ++
 .../org/apache/seatunnel/metrics/core/Gauge.java   |  10 +
 .../apache/seatunnel/metrics/core/Histogram.java   |  21 ++
 .../org/apache/seatunnel/metrics/core/Meter.java   |  13 ++
 .../org/apache/seatunnel/metrics/core/Metric.java  |   6 +
 .../seatunnel/metrics/core/MetricConfig.java       |  50 +++++
 .../apache/seatunnel/metrics/core/MetricInfo.java  |  55 +++++
 .../apache/seatunnel/metrics/core/MetricType.java  |   9 +
 .../seatunnel/metrics/core/SimpleCounter.java      |  29 +++
 .../apache/seatunnel/metrics/core/SimpleGauge.java |  15 ++
 .../seatunnel/metrics/core/SimpleHistogram.java    |  73 +++++++
 .../apache/seatunnel/metrics/core/SimpleMeter.java |  22 ++
 .../metrics/core/reporter/MetricReporter.java      |  23 ++
 seatunnel-metrics/seatunnel-metrics-flink/pom.xml  |  41 ++++
 .../metrics/flink/AbstractSeatunnelReporter.java   | 106 +++++++++
 .../metrics/flink/SeatunnelMetricReporter.java     | 123 +++++++++++
 .../seatunnel-metrics-prometheus/pom.xml           |  32 +++
 .../prometheus/PrometheusPushGatewayReporter.java  | 230 ++++++++++++++++++++
 seatunnel-metrics/seatunnel-metrics-spark/pom.xml  |  41 ++++
 .../metrics/spark/SeatunnelMetricSink.scala        | 241 +++++++++++++++++++++
 .../metrics/sink/SeatunnelMetricSink.scala         |  47 ++++
 .../translation/source/CoordinatedSource.java      |  13 +-
 .../translation/source/ParallelSource.java         |  13 +-
 62 files changed, 2195 insertions(+), 290 deletions(-)

diff --git a/.gitignore b/.gitignore
index 757280ec8..25977068e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -47,3 +47,5 @@ Test.scala
 test.conf
 spark-warehouse
 *.flattened-pom.xml
+
+seatunnel-examples
\ No newline at end of file
diff --git a/docs/en/connector-v2/source/FakeSource.md b/docs/en/connector-v2/source/FakeSource.md
index 3c66ce679..c4bc5057c 100644
--- a/docs/en/connector-v2/source/FakeSource.md
+++ b/docs/en/connector-v2/source/FakeSource.md
@@ -22,15 +22,21 @@ just for testing, such as type conversion and feature testing
 |-------------------|--------|----------|---------------|
 | result_table_name | string | yes      | -             |
 | schema            | config | yes      | -             |
+| row.num           | long   | no       | 10            |
 
 ### result_table_name [string]
 
 The table name.
 
 ### type [string]
+
 Table structure description ,you should assign schema option to tell connector how to parse data to the row you want.  
 **Tips**: Most of Unstructured-Datasource contain this param, such as LocalFile,HdfsFile.  
 **Example**:
+
+### row.num
+Number of additional rows of generated data
+
 ```hocon
 schema = {
       fields {
@@ -55,7 +61,9 @@ schema = {
 ```
 
 ## Example
+
 Simple source for FakeSource which contains enough datatype
+
 ```hocon
 source {
   FakeSource {
diff --git a/pom.xml b/pom.xml
index e54d67d69..40a7798d7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,6 +89,7 @@
         <module>seatunnel-formats</module>
         <module>seatunnel-dist</module>
 	    <module>seatunnel-server</module>
+        <module>seatunnel-metrics</module>
     </modules>
 
     <profiles>
diff --git a/seatunnel-api/pom.xml b/seatunnel-api/pom.xml
index 9c6a62266..71b344778 100644
--- a/seatunnel-api/pom.xml
+++ b/seatunnel-api/pom.xml
@@ -43,5 +43,10 @@
             <artifactId>jackson-dataformat-properties</artifactId>
             <version>${jackson.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-metrics-core</artifactId>
+            <version>${revision}</version>
+        </dependency>
     </dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
index 49c97beb6..7cd3046e9 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
@@ -17,8 +17,11 @@
 
 package org.apache.seatunnel.flink;
 
+import static org.apache.flink.configuration.ConfigOptions.key;
+
 import org.apache.seatunnel.apis.base.env.RuntimeEnv;
 import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.flink.util.ConfigKeyName;
@@ -29,6 +32,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
@@ -47,6 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.URL;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
@@ -126,8 +131,8 @@ public class FlinkEnvironment implements RuntimeEnv {
         List<Configuration> configurations = new ArrayList<>();
         try {
             configurations.add((Configuration) Objects.requireNonNull(ReflectionUtils.getDeclaredMethod(StreamExecutionEnvironment.class,
-                    "getConfiguration")).orElseThrow(() -> new RuntimeException("can't find " +
-                    "method: getConfiguration")).invoke(this.environment));
+                "getConfiguration")).orElseThrow(() -> new RuntimeException("can't find " +
+                "method: getConfiguration")).invoke(this.environment));
             if (!isStreaming()) {
                 configurations.add(batchEnvironment.getConfiguration());
             }
@@ -161,9 +166,9 @@ public class FlinkEnvironment implements RuntimeEnv {
     private void createStreamTableEnvironment() {
         // use blink and streammode
         EnvironmentSettings.Builder envBuilder = EnvironmentSettings.newInstance()
-                .inStreamingMode();
+            .inStreamingMode();
         if (this.config.hasPath(ConfigKeyName.PLANNER) && "blink"
-                .equals(this.config.getString(ConfigKeyName.PLANNER))) {
+            .equals(this.config.getString(ConfigKeyName.PLANNER))) {
             envBuilder.useBlinkPlanner();
         } else {
             envBuilder.useOldPlanner();
@@ -173,7 +178,7 @@ public class FlinkEnvironment implements RuntimeEnv {
         tableEnvironment = StreamTableEnvironment.create(getStreamExecutionEnvironment(), environmentSettings);
         TableConfig config = tableEnvironment.getConfig();
         if (this.config.hasPath(ConfigKeyName.MAX_STATE_RETENTION_TIME) && this.config
-                .hasPath(ConfigKeyName.MIN_STATE_RETENTION_TIME)) {
+            .hasPath(ConfigKeyName.MIN_STATE_RETENTION_TIME)) {
             long max = this.config.getLong(ConfigKeyName.MAX_STATE_RETENTION_TIME);
             long min = this.config.getLong(ConfigKeyName.MIN_STATE_RETENTION_TIME);
             config.setIdleStateRetentionTime(Time.seconds(min), Time.seconds(max));
@@ -181,7 +186,7 @@ public class FlinkEnvironment implements RuntimeEnv {
     }
 
     private void createStreamEnvironment() {
-        environment = StreamExecutionEnvironment.getExecutionEnvironment();
+        environment = creatMetricEnvironment();
         setTimeCharacteristic();
 
         setCheckpoint();
@@ -244,8 +249,8 @@ public class FlinkEnvironment implements RuntimeEnv {
                     break;
                 default:
                     LOGGER.warn(
-                            "set time-characteristic failed, unknown time-characteristic [{}],only support event-time,ingestion-time,processing-time",
-                            timeType);
+                        "set time-characteristic failed, unknown time-characteristic [{}],only support event-time,ingestion-time,processing-time",
+                        timeType);
                     break;
             }
         }
@@ -268,8 +273,8 @@ public class FlinkEnvironment implements RuntimeEnv {
                         break;
                     default:
                         LOGGER.warn(
-                                "set checkpoint.mode failed, unknown checkpoint.mode [{}],only support exactly-once,at-least-once",
-                                mode);
+                            "set checkpoint.mode failed, unknown checkpoint.mode [{}],only support exactly-once,at-least-once",
+                            mode);
                         break;
                 }
             }
@@ -302,10 +307,10 @@ public class FlinkEnvironment implements RuntimeEnv {
                 boolean cleanup = config.getBoolean(ConfigKeyName.CHECKPOINT_CLEANUP_MODE);
                 if (cleanup) {
                     checkpointConfig.enableExternalizedCheckpoints(
-                            CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
+                        CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
                 } else {
                     checkpointConfig.enableExternalizedCheckpoints(
-                            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+                        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 
                 }
             }
@@ -322,4 +327,71 @@ public class FlinkEnvironment implements RuntimeEnv {
         }
     }
 
+    public StreamExecutionEnvironment creatMetricEnvironment() {
+
+        if (!config.hasPath(CollectionConstants.METRICS_CLASS)) {
+            return StreamExecutionEnvironment.getExecutionEnvironment();
+        }
+
+        Configuration seatunnelReporter = initMetricConfig();
+
+        return StreamExecutionEnvironment.getExecutionEnvironment(seatunnelReporter);
+
+    }
+
+    private Configuration initMetricConfig() {
+        final int defaultDuration = 10;
+        //Build flink-metrics parameters
+        ConfigOption<String> reportersList =
+            key("metrics.reporters")
+                .stringType()
+                .noDefaultValue();
+
+        ConfigOption<String> reporterClass =
+            key("metrics.reporter.seatunnel_reporter.class")
+                .stringType()
+                .noDefaultValue();
+        ConfigOption<Duration> reporterInterval =
+            key("metrics.reporter.seatunnel_reporter.interval")
+                .durationType()
+                .defaultValue(Duration.ofSeconds(defaultDuration));
+
+        ConfigOption<String> reporterConfigPort =
+            key("metrics.reporter.seatunnel_reporter.port")
+                .stringType()
+                .noDefaultValue();
+        ConfigOption<String> reporterConfigHost =
+            key("metrics.reporter.seatunnel_reporter.host")
+                .stringType()
+                .noDefaultValue();
+        ConfigOption<String> reporterConfigJobName =
+            key("metrics.reporter.seatunnel_reporter.jobName")
+                .stringType()
+                .noDefaultValue();
+        ConfigOption<String> reporterConfigReporterName =
+                key("metrics.reporter.seatunnel_reporter.reporterName")
+                        .stringType()
+                        .noDefaultValue();
+
+        Configuration seatunnelReporter = new Configuration().set(reportersList, "seatunnel_reporter").set(reporterClass, "org.apache.seatunnel.metrics.flink.SeatunnelMetricReporter");
+        if (config.hasPath(CollectionConstants.METRICS_INTERVAL)) {
+            Duration duration = Duration.ofSeconds(config.getLong(CollectionConstants.METRICS_INTERVAL));
+            seatunnelReporter.set(reporterInterval, duration);
+        }
+
+        if (config.hasPath(CollectionConstants.METRICS_PORT)) {
+            seatunnelReporter.set(reporterConfigPort, config.getString(CollectionConstants.METRICS_PORT));
+        }
+
+        if (config.hasPath(CollectionConstants.METRICS_HOST)) {
+            seatunnelReporter.set(reporterConfigHost, config.getString(CollectionConstants.METRICS_HOST));
+        }
+
+        if (config.hasPath(CollectionConstants.METRICS_JOB_NAME)) {
+            seatunnelReporter.set(reporterConfigJobName, config.getString(CollectionConstants.METRICS_JOB_NAME));
+        }
+        seatunnelReporter.set(reporterConfigReporterName, config.getString(CollectionConstants.METRICS_CLASS));
+        return seatunnelReporter;
+    }
+
 }
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/ConfigKeyName.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/ConfigKeyName.java
index 88b767517..43c4f36ba 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/ConfigKeyName.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/ConfigKeyName.java
@@ -45,4 +45,9 @@ public class ConfigKeyName {
     public static final String MIN_STATE_RETENTION_TIME = "execution.query.state.min-retention";
     public static final String STATE_BACKEND = "execution.state.backend";
     public static final String PLANNER = "execution.planner";
+    public static final String METRICS_INTERVAL = "execution.metrics.interval";
+    public static final String METRICS_CLASS = "execution.metrics.class";
+    public static final String METRICS_PORT = "execution.metrics.port";
+    public static final String METRICS_JOB_NAME = "execution.metrics.jobName";
+    public static final String METRICS_HOST = "execution.metrics.host";
 }
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
index 3e2d5e91b..0e53ddf5a 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
@@ -23,6 +23,7 @@ import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME;
 import org.apache.seatunnel.apis.base.env.RuntimeEnv;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.config.ConfigRuntimeException;
+import org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.common.constants.JobMode;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -100,6 +101,7 @@ public class SparkEnvironment implements RuntimeEnv {
     public SparkEnvironment prepare() {
         SparkConf sparkConf = createSparkConf();
         SparkSession.Builder builder = SparkSession.builder().config(sparkConf);
+        creatMetricBuilder(builder);
         if (enableHive) {
             builder.enableHiveSupport();
         }
@@ -178,6 +180,26 @@ public class SparkEnvironment implements RuntimeEnv {
         }
         return sink.output(fromDs, environment);
     }
+
+    private SparkSession.Builder creatMetricBuilder(SparkSession.Builder builder){
+        if (config.hasPath(CollectionConstants.METRICS_CLASS)) {
+            builder.config("spark.metrics.conf.*.sink.console.class", "org.apache.spark.seatunnel.metrics.sink.SeatunnelMetricSink");
+            if (config.hasPath(CollectionConstants.METRICS_HOST)) {
+                builder.config("spark.metrics.conf.*.sink.console.host", config.getString(CollectionConstants.METRICS_HOST));
+            }
+            if (config.hasPath(CollectionConstants.METRICS_PORT)) {
+                builder.config("spark.metrics.conf.*.sink.console.port", config.getString(CollectionConstants.METRICS_PORT));
+            }
+            if (config.hasPath(CollectionConstants.METRICS_JOB_NAME)) {
+                builder.config("spark.metrics.conf.*.sink.console.jobName", config.getString(CollectionConstants.METRICS_JOB_NAME));
+            }
+            if (config.hasPath(CollectionConstants.METRICS_INTERVAL)) {
+                builder.config("spark.metrics.conf.*.sink.console.interval", config.getString(CollectionConstants.METRICS_INTERVAL));
+            }
+            builder.config("spark.metrics.conf.*.sink.console.reporterName", config.getString(CollectionConstants.METRICS_CLASS));
+        }
+        return builder;
+    }
 }
 
 
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
index 28f5dc4d7..7f3f9c71b 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
@@ -20,4 +20,9 @@ package org.apache.seatunnel.common.constants;
 public class CollectionConstants {
 
     public static final int MAP_SIZE = 6;
+    public static final String METRICS_INTERVAL = "seatunnel.metrics.interval";
+    public static final String METRICS_CLASS = "seatunnel.metrics.class";
+    public static final String METRICS_PORT = "seatunnel.metrics.port";
+    public static final String METRICS_JOB_NAME = "seatunnel.metrics.jobName";
+    public static final String METRICS_HOST = "seatunnel.metrics.host";
 }
diff --git a/seatunnel-connectors-v2/README.md b/seatunnel-connectors-v2/README.md
index b24122d94..e6a6c6006 100644
--- a/seatunnel-connectors-v2/README.md
+++ b/seatunnel-connectors-v2/README.md
@@ -1,84 +1,180 @@
 # Purpose
-This article introduces the new interface and the new code structure on account of the newly designed API for Connectors in Apache SeaTunnel. This helps developers with quick overview regarding API, translation layer improvement, and development of new Connector.
+
+This article introduces the new interface and the new code structure on account of the newly designed API for Connectors
+in Apache SeaTunnel. This helps developers quickly understand API and transformation layer improvements. On the other
+hand, it can guide contributors how to use the new API to develop new connectors.See
+this [issue](https://github.com/apache/incubator-seatunnel/issues/1608) for details.
 
 ## **Code Structure**
-In order to separate from the old code, we have defined new modules for execution flow. This facilitates parallel development at the current stage, and reduces the difficulty of merging. All the relevant code at this stage is kept on the ``api-draft`` branch.
+
+In order to separate from the old code, we have defined new modules for execution flow. This facilitates parallel
+development at the current stage, and reduces the difficulty of merging.
 
 ### **Example**
-We have prepared a new version of the locally executable example program in ``seatunnel-examples``, which can be directly called using ``seatunnel-flink-connector-v2-example`` or ``seatunnel-spark-connector-v2-example`` in ``SeaTunnelApiExample``. This is also the debugging method that is often used in the local development of Connector. The corresponding configuration files are saved in the same module ``resources/examples`` folder as before.
 
+We have prepared two new version of the locally executable example program in `seatunnel-examples`,one
+is `seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java`
+, it runs in the Flink engine. Another one
+is `seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java`
+, it runs in the Spark engine. This is also the debugging method that is often used in the local development of
+Connector. You can debug these examples, which will help you better understand the running logic of the program. The
+configuration files used in example are saved in the "resources/examples" folder. If you want to add examples for your
+own connectors, you need to follow the steps below.
+
+1. Add the groupId, artifactId and version of the connector to be tested to
+   seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml(or add it to
+   seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml when you want to runs it in Spark engine) as a
+   dependency.
+2. Find the dependency in your connector pom file which scope is test or provided and then add them to
+   seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml(or add it to
+   seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml) file and modify the scope to compile.
+3. Refer to the SeaTunnelApiExample class to develop your sample code.
 
 ### **Startup Class**
-Aside from the old startup class, we have created two new startup class projects, namely ``seatunnel-core/seatunnel-flink-starter`` and ``seatunnel-core/seatunnel-spark-starter``. You can find out how to parse the configuration file into an executable Flink/Spark process here.
+
+Aside from the old startup class, we have created two new startup modules,
+namely ``seatunnel-core/seatunnel-flink-starter`` and ``seatunnel-core/seatunnel-spark-starter``. You can find out how
+to parse the configuration file into an executable Flink/Spark process here.
 
 ### **SeaTunnel API**
-A new ``seatunnel-api`` (not ``seatunnel-apis``) module has been created to store the new interfaces defined by the SeaTunnel API. By implementing these interfaces, developers can complete the SeaTunnel Connector that supports multiple engines.
+
+A new ``seatunnel-api`` (not ``seatunnel-apis``) module has been created to store the new interfaces defined by the
+SeaTunnel API. By implementing these interfaces, developers can complete the SeaTunnel Connector that supports multiple
+engines.
 
 ### **Translation Layer**
-We realize the conversion between SeaTunnel API and Engine API by adapting the interfaces of different engines, so as to achieve the effect of translation, and let our SeaTunnel Connector support the operation of multiple different engines. The corresponding code address, ``seatunnel-translation``, this module has the corresponding translation layer implementation. If you are interested, you can view the code and help us improve the current code.
+
+We realize the conversion between SeaTunnel API and Engine API by adapting the interfaces of different engines, so as to
+achieve the effect of translation, and let our SeaTunnel Connector support the operation of multiple different engines.
+The corresponding code address, ``seatunnel-translation``, this module has the corresponding translation layer
+implementation. If you are interested, you can view the code and help us improve the current code.
 
 ## **API introduction**
+
 The API design of the current version of SeaTunnel draws on the design concept of Flink.
 
 ### **Source**
+
 #### **SeaTunnelSource.java**
-- The Source of SeaTunnel adopts the design of stream-batch integration, ``getBoundedness`` which determines whether the current Source is a stream Source or a batch Source, so you can specify a Source by dynamic configuration (refer to the default method), which can be either a stream or a batch.
-- ``getRowTypeInfo`` To get the schema of the data, the connector can choose to hard-code to implement a fixed schema, or run the user to customize the schema through config configuration. The latter is recommended.
-- SeaTunnelSource is a class executed on the driver side, through which objects such as SourceReader, SplitEnumerator and serializers are obtained.
+
+- The Source of SeaTunnel adopts the design of stream-batch integration, ``getBoundedness`` which determines whether the
+  current Source is a stream Source or a batch Source, so you can specify a Source by dynamic configuration (refer to
+  the default method), which can be either a stream or a batch.
+- ``getRowTypeInfo`` To get the schema of the data, the connector can choose to hard-code to implement a fixed schema,
+  or run the user to customize the schema through config configuration. The latter is recommended.
+- SeaTunnelSource is a class executed on the driver side, through which objects such as SourceReader, SplitEnumerator
+  and serializers are obtained.
 - Currently, the data type supported by SeaTunnelSource must be SeaTunnelRow.
 
 #### **SourceSplitEnumerator.java**
-Use this enumerator to get the data read shard (SourceSplit) situation, different shards may be assigned to different SourceReaders to read data. Contains several key methods:
 
-- ``run``: Used to perform a spawn SourceSplit and call ``SourceSplitEnumerator.Context.assignSplit``: to distribute the shards to the SourceReader.
-- ``addSplitsBackSourceSplitEnumerator``: is required to redistribute these Splits when SourceSplit cannot be processed normally or restarted due to the exception of SourceReader.
-- ``registerReaderProcess``: some SourceReaders that are registered after the run is run. If there is no SourceSplit distributed at this time, it can be distributed to these new readers (yes, you need to maintain your SourceSplit distribution in SourceSplitEnumerator most of the time).
-- ``handleSplitRequest``: If some Readers actively request SourceSplit from SourceSplitEnumerator, this method can be called SourceSplitEnumerator.Context.assignSplit to sends shards to the corresponding Reader.
-- ``snapshotState``: It is used for stream processing to periodically return the current state that needs to be saved. If there is a state restoration, it will be called SeaTunnelSource.restoreEnumerator to constructs a SourceSplitEnumerator and restore the saved state to the SourceSplitEnumerator.
-- ``notifyCheckpointComplete``: It is used for subsequent processing after the state is successfully saved, and can be used to store the state or mark in third-party storage.
+Use this enumerator to get the data read shard (SourceSplit) situation, different shards may be assigned to different
+SourceReaders to read data. Contains several key methods:
+
+- ``run``: Used to perform a spawn SourceSplit and call ``SourceSplitEnumerator.Context.assignSplit``: to distribute the
+  shards to the SourceReader.
+- ``addSplitsBackSourceSplitEnumerator``: is required to redistribute these Splits when SourceSplit cannot be processed
+  normally or restarted due to the exception of SourceReader.
+- ``registerReaderProcess``: some SourceReaders that are registered after the run is run. If there is no SourceSplit
+  distributed at this time, it can be distributed to these new readers (yes, you need to maintain your SourceSplit
+  distribution in SourceSplitEnumerator most of the time).
+- ``handleSplitRequest``: If some Readers actively request SourceSplit from SourceSplitEnumerator, this method can be
+  called SourceSplitEnumerator.Context.assignSplit to sends shards to the corresponding Reader.
+- ``snapshotState``: It is used for stream processing to periodically return the current state that needs to be saved.
+  If there is a state restoration, it will be called SeaTunnelSource.restoreEnumerator to constructs a
+  SourceSplitEnumerator and restore the saved state to the SourceSplitEnumerator.
+- ``notifyCheckpointComplete``: It is used for subsequent processing after the state is successfully saved, and can be
+  used to store the state or mark in third-party storage.
 
 #### **SourceSplit.java**
-The interface used to save shards. Different shards need to define different splitIds. You can implement this interface to save the data that shards need to save, such as kafka's partition and topic, hbase's columnfamily and other information, which are used by SourceReader to determine Which part of the total data should be read.
+
+The interface used to save shards. Different shards need to define different splitIds. You can implement this interface
+to save the data that shards need to save, such as kafka's partition and topic, hbase's columnfamily and other
+information, which are used by SourceReader to determine Which part of the total data should be read.
 
 #### **SourceReader.java**
-The interface that directly interacts with the data source, and the action of reading data from the data source is completed by implementing this interface.
-- ``pollNext``: It is the core of Reader. Through this interface, the process of reading the data of the data source and returning it to SeaTunnel is realized. Whenever you are ready to pass data to SeaTunnel, you can call the ``Collector.collect`` method in the parameter, which can be called an infinite number of times to complete a large amount of data reading. But the data format supported at this stage can only be ``SeaTunnelRow``. Because our Source is a stream-batch integration, th [...]
 
-        if ( Boundedness . BOUNDED . equals ( context . getBoundedness ())) 
-        {
-        // signal to the source that we have reached the end of the data. context . signalNoMoreElement ();
-        break ;
-        }
+The interface that directly interacts with the data source, and the action of reading data from the data source is
+completed by implementing this interface.
+
+- ``pollNext``: It is the core of Reader. Through this interface, the process of reading the data of the data source and
+  returning it to SeaTunnel is realized. Whenever you are ready to pass data to SeaTunnel, you can call
+  the ``Collector.collect`` method in the parameter, which can be called an infinite number of times to complete a large
+  amount of data reading. But the data format supported at this stage can only be ``SeaTunnelRow``. Because our Source
+  is a stream-batch integration, the Connector has to decide when to end data reading in batch mode. For example, a
+  batch reads 100 pieces of data at a time. After the reading is completed, it needs ``pollNext`` to call in
+  to ``SourceReader.Context.signalNoMoreElementnotify`` SeaTunnel that there is no data to read . , then you can use
+  these 100 pieces of data for batch processing. Stream processing does not have this requirement, so most SourceReaders
+  with integrated stream batches will have the following code:
+
+```java
+if(Boundedness.BOUNDED.equals(context.getBoundedness())){
+    // signal to the source that we have reached the end of the data.
+    context.signalNoMoreElement();
+    break;
+    }
+```
 
 It means that SeaTunnel will be notified only in batch mode.
 
-- ``addSplits``:  Used by the framework to assign SourceSplit to different SourceReaders, SourceReader should save the obtained shards, and then pollNextread the corresponding shard data in it, but there may be times when the Reader does not read shards (maybe SourceSplit has not been generated or The current Reader is indeed not allocated), at this time, pollNextcorresponding processing should be made, such as continuing to wait.
-- ``handleNoMoreSplits``: When triggered, it indicates that there are no more shards, and the Connector Source is required to optionally make corresponding feedback
-- ``snapshotStateIt``: is used for stream processing to periodically return the current state that needs to be saved, that is, the fragmentation information (SeaTunnel saves the fragmentation information and state together to achieve dynamic allocation).
-- ``notifyCheckpointComplete``: Like ``notifyCheckpointAborted`` the name, it is a callback for different states of checkpoint.
+- ``addSplits``:  Used by the framework to assign SourceSplit to different SourceReaders, SourceReader should save the
+  obtained shards, and then pollNextread the corresponding shard data in it, but there may be times when the Reader does
+  not read shards (maybe SourceSplit has not been generated or The current Reader is indeed not allocated), at this
+  time, pollNextcorresponding processing should be made, such as continuing to wait.
+- ``handleNoMoreSplits``: When triggered, it indicates that there are no more shards, and the Connector Source is
+  required to optionally make corresponding feedback
+- ``snapshotStateIt``: is used for stream processing to periodically return the current state that needs to be saved,
+  that is, the fragmentation information (SeaTunnel saves the fragmentation information and state together to achieve
+  dynamic allocation).
+- ``notifyCheckpointComplete``: Like ``notifyCheckpointAborted`` the name, it is a callback for different states of
+  checkpoint.
 
 ### **Sink**
+
 #### **SeaTunnelSink.java**
-It is used to define the way to write data to the destination, and obtain instances such as ``SinkWriter`` and ``SinkCommitter`` through this interface. An important feature of the sink side is the processing of distributed transactions. SeaTunnel defines two different Committers: ``SinkCommitter`` used to process transactions for different subTasks ``SinkAggregatedCommitter``. Process transaction results for all nodes. Different Connector Sinks can be selected according to component pro [...]
+
+It is used to define the way to write data to the destination, and obtain instances such as ``SinkWriter``
+and ``SinkCommitter`` through this interface. An important feature of the sink side is the processing of distributed
+transactions. SeaTunnel defines two different Committers: ``SinkCommitter`` used to process transactions for different
+subTasks ``SinkAggregatedCommitter``. Process transaction results for all nodes. Different Connector Sinks can be
+selected according to component properties, whether to implement only ``SinkCommitter`` or ``SinkAggregatedCommitter``,
+or both.
 
 #### **SinkWriter.java**
-It is used to directly interact with the output source, and provide the data obtained by SeaTunnel through the data source to the Writer for data writing.
 
-- ``write``: Responsible for transferring data to ``SinkWriter``, you can choose to write it directly, or write it after buffering a certain amount of data. Currently, only the data type is supported ``SeaTunnelRow``.
-- ``prepareCommit``: Executed before commit, you can write data directly here, or you can implement phase one in 2pc, and then implement phase two in ``SinkCommitter`` or ``SinkAggregatedCommitter``. What this method returns is the commit information, which will be provided ``SinkCommitter`` and ``SinkAggregatedCommitter`` used for the next stage of transaction processing.
+It is used to directly interact with the output source, and provide the data obtained by SeaTunnel through the data
+source to the Writer for data writing.
+
+- ``write``: Responsible for transferring data to ``SinkWriter``, you can choose to write it directly, or write it after
+  buffering a certain amount of data. Currently, only the data type is supported ``SeaTunnelRow``.
+- ``prepareCommit``: Executed before commit, you can write data directly here, or you can implement phase one in 2pc,
+  and then implement phase two in ``SinkCommitter`` or ``SinkAggregatedCommitter``. What this method returns is the
+  commit information, which will be provided ``SinkCommitter`` and ``SinkAggregatedCommitter`` used for the next stage
+  of transaction processing.
 
 #### **SinkCommitter.java**
-It is used to process ``SinkWriter.prepareCommit`` the returned data information, including transaction information that needs to be submitted.
+
+It is used to process ``SinkWriter.prepareCommit`` the returned data information, including transaction information that
+needs to be submitted.
 
 #### **SinkAggregatedCommitter.java**
-It is used to process ``SinkWriter.prepareCommit`` the returned data information, including transaction information that needs to be submitted, etc., but it will be processed together on a single node, which can avoid the problem of inconsistency of the state caused by the failure of the second part of the stage.
 
-- ``combine``: Used ``SinkWriter.prepareCommit`` to aggregate the returned transaction information, and then generate aggregated transaction information.
+It is used to process ``SinkWriter.prepareCommit`` the returned data information, including transaction information that
+needs to be submitted, etc., but it will be processed together on a single node, which can avoid the problem of
+inconsistency of the state caused by the failure of the second part of the stage.
+
+- ``combine``: Used ``SinkWriter.prepareCommit`` to aggregate the returned transaction information, and then generate
+  aggregated transaction information.
 
 #### **Implement SinkCommitter or SinkAggregatedCommitter?**
-In the current version, it is recommended to implement ``SinkAggregatedCommitter`` as the first choice, which can provide strong consistency guarantee in Flink/Spark. At the same time, commit should be idempotent, and save engine retry can work normally.
+
+In the current version, it is recommended to implement ``SinkAggregatedCommitter`` as the first choice, which can
+provide strong consistency guarantee in Flink/Spark. At the same time, commit should be idempotent, and save engine
+retry can work normally.
 
 ## **Result**
-All Connector implementations should be under the ``seatunnel-connectors/seatunnel-connectors-seatuunelmodule``, and the examples that can be referred to at this stage are under this module.
+
+All Connector implementations should be under the ``seatunnel-connectors-v2``, and the examples that can be referred to
+at this stage are under this module.
 
 
diff --git a/seatunnel-connectors-v2/README.zh.md b/seatunnel-connectors-v2/README.zh.md
index 017b19907..02a59af2e 100644
--- a/seatunnel-connectors-v2/README.zh.md
+++ b/seatunnel-connectors-v2/README.zh.md
@@ -1,64 +1,127 @@
 ## 目的
-Because SeaTunnel design new API for connectors, 所以通过这篇文章来介绍新的接口以及新的代码结构,方便开发者快速的帮助新API和翻译层完善,以及开发出新的Connecotor.
+
+SeaTunnel为与计算引擎进行解耦,设计了新的连接器API,通过这篇文章来介绍新的接口以及新的代码结构,方便开发者快速上手使用新版API开发连接器并理解新版API运行原理.
+详细设计请查看该[提议](https://github.com/apache/incubator-seatunnel/issues/1608) 。
+
 ## 代码结构
-现阶段所有相关代码保存在`api-draft`分支上。
+
 为了和老的代码分开,方便现阶段的并行开发,以及降低merge的难度。我们为新的执行流程定义了新的模块
+
 ### Example
-我们已经在`seatunnel-examples`中准备好了新版本的可本地执行Example程序,直接调用`seatunnel-flink-connector-v2-example`或`seatunnel-spark-connector-v2-example`中的`SeaTunnelApiExample`即可。这也是本地开发Connector经常会用到的调试方式。
-对应的配置文件保存在同模块的`resources/examples`文件夹下,和以前一样。
+
+我们已经在`seatunnel-examples`
+准备了两个本地可执行的案例程序,其中一个是`seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java`
+,它运行在flink引擎上。另外一个是`seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java`
+,它运行在spark引擎上。你可以通过调试这些例子帮你更好的理解程序运行逻辑。使用的配置文件保存在`resources/examples`文件夹里。如果你想增加自己的connectors,你需要按照下面的步骤。
+
+1. 在`seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml`添加connector依赖的groupId, artifactId 和
+   version.(或者当你想在spark引擎运行时在`seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml`添加依赖)
+2. 如果你的connector中存在scope为test或provided的依赖,将这些依赖添加到seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml(
+   或者在seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml)中,并且修改scope为compile.
+3. 参考`SeaTunnelApiExample`开发自己的案例程序。
+
 ### 启动类
-和老的启动类分开,我们创建了两个新的启动类工程,分别是`seatunnel-core/seatunnel-flink-starter`和`seatunnel-core/seatunnel-spark-starter`. 可以在这里找到如何将配置文件解析为可以执行的Flink/Spark流程。
+
+和老的启动类分开,我们创建了两个新的启动类工程,分别是`seatunnel-core/seatunnel-flink-starter`和`seatunnel-core/seatunnel-spark-starter`.
+可以在这里找到如何将配置文件解析为可以执行的Flink/Spark流程。
+
 ### SeaTunnel API
+
 新建了一个`seatunnel-api`(不是`seatunnel-apis`)模块,用于存放SeaTunnel API定义的新接口, 开发者通过对这些接口进行实现,就可以完成支持多引擎的SeaTunnel Connector
+
 ### 翻译层
-我们通过适配不同引擎的接口,实现SeaTunnel API和Engine API的转换,从而达到翻译的效果,让我们的SeaTunnel Connector支持多个不同引擎的运行。
-对应代码地址为`seatunnel-translation`,该模块有对应的翻译层实现。感兴趣可以查看代码,帮助我们完善当前代码。
+
+我们通过适配不同引擎的接口,实现SeaTunnel API和Engine API的转换,从而达到翻译的效果,让我们的SeaTunnel Connector支持多个不同引擎的运行。 对应代码地址为`seatunnel-translation`
+,该模块有对应的翻译层实现。感兴趣可以查看代码,帮助我们完善当前代码。
+
 ## API 介绍
+
 `SeaTunnel 当前版本的API设计借鉴了Flink的设计理念`
+
 ### Source
+
 #### SeaTunnelSource.java
-- SeaTunnel的Source采用流批一体的设计,通过`getBoundedness`来决定当前Source是流Source还是批Source,所以可以通过动态配置的方式(参考default方法)来指定一个Source既可以为流,也可以为批。
+
+- SeaTunnel的Source采用流批一体的设计,通过`getBoundedness`
+  来决定当前Source是流Source还是批Source,所以可以通过动态配置的方式(参考default方法)来指定一个Source既可以为流,也可以为批。
 - `getRowTypeInfo`来得到数据的schema,connector可以选择硬编码来实现固定的schema,或者运行用户通过config配置来自定义schema,推荐后者。
 - SeaTunnelSource是执行在driver端的类,通过该类,来获取SourceReader,SplitEnumerator等对象以及序列化器。
 - 目前SeaTunnelSource支持的生产的数据类型必须是SeaTunnelRow类型。
+
 #### SourceSplitEnumerator.java
+
 通过该枚举器来获取数据读取的分片(SourceSplit)情况,不同的分片可能会分配给不同的SourceReader来读取数据。包含几个关键方法:
+
 - `run`用于执行产生SourceSplit并调用`SourceSplitEnumerator.Context.assignSplit`来将分片分发给SourceReader。
 - `addSplitsBack`用于处理SourceReader异常导致SourceSplit无法正常处理或者重启时,需要SourceSplitEnumerator对这些Split进行重新分发。
-- `registerReader`处理一些在run运行了之后才注册上的SourceReader,如果这个时候还没有分发下去的SourceSplit,就可以分发给这些新的Reader(对,你大多数时候需要在SourceSplitEnumerator里面维护你的SourceSplit分发情况)
-- `handleSplitRequest`如果有些Reader主动向SourceSplitEnumerator请求SourceSplit,那么可以通过该方法调用`SourceSplitEnumerator.Context.assignSplit`来向对应的Reader发送分片。
-- `snapshotState`用于流处理定时返回需要保存的当前状态,如果有状态恢复时,会调用`SeaTunnelSource.restoreEnumerator`来构造SourceSplitEnumerator,将保存的状态恢复给SourceSplitEnumerator。
+- `registerReader`
+  处理一些在run运行了之后才注册上的SourceReader,如果这个时候还没有分发下去的SourceSplit,就可以分发给这些新的Reader(对,你大多数时候需要在SourceSplitEnumerator里面维护你的SourceSplit分发情况)
+- `handleSplitRequest`
+  如果有些Reader主动向SourceSplitEnumerator请求SourceSplit,那么可以通过该方法调用`SourceSplitEnumerator.Context.assignSplit`来向对应的Reader发送分片。
+- `snapshotState`用于流处理定时返回需要保存的当前状态,如果有状态恢复时,会调用`SeaTunnelSource.restoreEnumerator`
+  来构造SourceSplitEnumerator,将保存的状态恢复给SourceSplitEnumerator。
 - `notifyCheckpointComplete`用于状态保存成功后的后续处理,可以用于将状态或者标记存入第三方存储。
+
 #### SourceSplit.java
+
 用于保存分片的接口,不同的分片需要定义不同的splitId,可以通过实现这个接口,保存分片需要保存的数据,比如kafka的partition和topic,hbase的columnfamily等信息,用于SourceReader来确定应该读取全部数据的哪一部分。
+
 #### SourceReader.java
+
 直接和数据源进行交互的接口,通过实现该接口完成从数据源读取数据的动作。
-- `pollNext`便是Reader的核心,通过这个接口,实现读取数据源的数据然后返回给SeaTunnel的流程。每当准备将数据传递给SeaTunnel时,就可以调用参数中的`Collector.collect`方法,可以无限次的调用该方法完成数据的大量读取。但是现阶段支持的数据格式只能是`SeaTunnelRow`。因为我们的Source是流批一体的,所以批模式的时候Connector要自己决定什么时候结束数据读取,比如批处理一次读取100条数据,读取完成后需要在`pollNext`中调用`SourceReader.Context.signalNoMoreElement`通知SeaTunnel没有数据读取了,那么就可以利用这100条数据进行批处理。流处理没有这个要求,那么大多数流批一体的SourceReader都会出现如下代码:
+
+- `pollNext`便是Reader的核心,通过这个接口,实现读取数据源的数据然后返回给SeaTunnel的流程。每当准备将数据传递给SeaTunnel时,就可以调用参数中的`Collector.collect`
+  方法,可以无限次的调用该方法完成数据的大量读取。但是现阶段支持的数据格式只能是`SeaTunnelRow`
+  。因为我们的Source是流批一体的,所以批模式的时候Connector要自己决定什么时候结束数据读取,比如批处理一次读取100条数据,读取完成后需要在`pollNext`
+  中调用`SourceReader.Context.signalNoMoreElement`
+  通知SeaTunnel没有数据读取了,那么就可以利用这100条数据进行批处理。流处理没有这个要求,那么大多数流批一体的SourceReader都会出现如下代码:
+
 ```java
 if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
     // signal to the source that we have reached the end of the data.
     context.signalNoMoreElement();
     break;
-}
+    }
 ```
+
 代表着只有批模式的时候才会通知SeaTunnel。
-- `addSplits`用于框架将SourceSplit分配给不同的SourceReader,SourceReader应该将得到的分片保存起来,然后在`pollNext`中读取对应的分片数据,但是可能出现Reader没有分片读取的时候(可能SourceSplit还没生成或者当前Reader确实分配不到),这个时候`pollNext`应该做出对应的处理,比如继续等待。
+
+- `addSplits`用于框架将SourceSplit分配给不同的SourceReader,SourceReader应该将得到的分片保存起来,然后在`pollNext`
+  中读取对应的分片数据,但是可能出现Reader没有分片读取的时候(可能SourceSplit还没生成或者当前Reader确实分配不到),这个时候`pollNext`应该做出对应的处理,比如继续等待。
 - `handleNoMoreSplits`触发时表示没有更多分片,需要Connector Source可选的做出相应的反馈
 - `snapshotState`用于流处理定时返回需要保存的当前状态,也就是分片信息(SeaTunnel将分片信息和状态保存在一起,实现动态分配)。
 - `notifyCheckpointComplete`和`notifyCheckpointAborted`和名字一样,是checkpoint不同状态下的回调。
+
 ### Sink
+
 #### SeaTunnelSink.java
-用于定义数据写入目标端的方式,通过该接口来实现获取SinkWriter和SinkCommitter等实例。Sink端有一个重要特性就是分布式事务的处理,SeaTunnel定义了两种不同的Committer:`SinkCommitter`用于处理针对不同的subTask进行事务的处理,每个subTask处理各自的事务,然后成功后再由`SinkAggregatedCommitter`单线程的处理所有节点的事务结果。不同的Connector Sink可以根据组件属性进行选择,到底是只实现`SinkCommitter`或`SinkAggregatedCommitter`,还是都实现。
+
+用于定义数据写入目标端的方式,通过该接口来实现获取SinkWriter和SinkCommitter等实例。Sink端有一个重要特性就是分布式事务的处理,SeaTunnel定义了两种不同的Committer:`SinkCommitter`
+用于处理针对不同的subTask进行事务的处理,每个subTask处理各自的事务,然后成功后再由`SinkAggregatedCommitter`单线程的处理所有节点的事务结果。不同的Connector
+Sink可以根据组件属性进行选择,到底是只实现`SinkCommitter`或`SinkAggregatedCommitter`,还是都实现。
+
 #### SinkWriter.java
+
 用于直接和输出源进行交互,将SeaTunnel通过数据源取得的数据提供给Writer进行数据写入。
+
 - `write` 负责将数据传入SinkWriter,可以选择直接写入,或者缓存到一定数据后再写入,目前数据类型只支持`SeaTunnelRow`。
-- `prepareCommit` 在commit之前执行,可以在这直接写入数据,也可以实现2pc中的阶段一,然后在`SinkCommitter`或`SinkAggregatedCommitter`中实现阶段二。该方法返回的就是commit信息,将会提供给`SinkCommitter`和`SinkAggregatedCommitter`用于下一阶段事务处理。
+- `prepareCommit` 在commit之前执行,可以在这直接写入数据,也可以实现2pc中的阶段一,然后在`SinkCommitter`或`SinkAggregatedCommitter`
+  中实现阶段二。该方法返回的就是commit信息,将会提供给`SinkCommitter`和`SinkAggregatedCommitter`用于下一阶段事务处理。
+
 #### SinkCommitter.java
+
 用于处理`SinkWriter.prepareCommit`返回的数据信息,包含需要提交的事务信息等。
+
 #### SinkAggregatedCommitter.java
+
 用于处理`SinkWriter.prepareCommit`返回的数据信息,包含需要提交的事务信息等,但是会在单个节点一起处理,这样可以避免阶段二部分失败导致状态不一致的问题。
+
 - `combine` 用于将`SinkWriter.prepareCommit`返回的事务信息进行聚合,然后生成聚合的事务信息。
+
 #### 我应该实现SinkCommitter还是SinkAggregatedCommitter?
+
 当前版本推荐将实现SinkAggregatedCommitter作为首选,可以在Flink/Spark中提供较强的一致性保证,同时commit应该要实现幂等性,保存引擎重试能够正常运作。
+
 ## 实现
-所有的Connector实现都应该在`seatunnel-connectors/seatunnel-connectors-seatuunel`模块下,现阶段可参考的示例均在此模块下。
\ No newline at end of file
+
+现阶段所有的连接器实现及可参考的示例都在seatunnel-connectors-v2下,用户可自行查阅参考。
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeOptions.java
similarity index 55%
copy from seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
copy to seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeOptions.java
index 28f5dc4d7..96dc5e8ac 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/CollectionConstants.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeOptions.java
@@ -15,9 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.common.constants;
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
 
-public class CollectionConstants {
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-    public static final int MAP_SIZE = 6;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+
+public class FakeOptions implements Serializable {
+
+    private static final String ROW_NUM = "row.num";
+    private static final Long DEFAULT_ROW_NUM = 10L;
+    @Getter
+    @Setter
+    private Long rowNum;
+
+    public static FakeOptions parse(Config config) {
+        FakeOptions fakeOptions = new FakeOptions();
+        fakeOptions.setRowNum(config.hasPath(ROW_NUM) ? config.getLong(ROW_NUM) : DEFAULT_ROW_NUM);
+        return fakeOptions;
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index fd9387f2d..eeede4439 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -38,6 +38,7 @@ public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {
     private Config pluginConfig;
     private JobContext jobContext;
     private SeaTunnelSchema schema;
+    private FakeOptions fakeOptions;
 
     @Override
     public Boundedness getBoundedness() {
@@ -51,7 +52,7 @@ public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {
 
     @Override
     public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext) throws Exception {
-        return new FakeSourceReader(readerContext, new FakeRandomData(schema));
+        return new FakeSourceReader(readerContext, new FakeRandomData(schema), fakeOptions);
     }
 
     @Override
@@ -64,6 +65,7 @@ public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {
         this.pluginConfig = pluginConfig;
         assert pluginConfig.hasPath(FakeRandomData.SCHEMA);
         this.schema = SeaTunnelSchema.buildWithConfig(pluginConfig.getConfig(FakeRandomData.SCHEMA));
+        this.fakeOptions = FakeOptions.parse(pluginConfig);
     }
 
     @Override
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index d67e1f4b2..6301a284f 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -33,10 +33,12 @@ public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
     private final SingleSplitReaderContext context;
 
     private final FakeRandomData fakeRandomData;
+    private final FakeOptions options;
 
-    public FakeSourceReader(SingleSplitReaderContext context, FakeRandomData randomData) {
+    public FakeSourceReader(SingleSplitReaderContext context, FakeRandomData randomData, FakeOptions options) {
         this.context = context;
         this.fakeRandomData = randomData;
+        this.options = options;
     }
 
     @Override
@@ -53,7 +55,7 @@ public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
     @SuppressWarnings("magicnumber")
     public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException {
         // Generate a random number of rows to emit.
-        for (int i = 0; i < 10; i++) {
+        for (int i = 0; i < options.getRowNum(); i++) {
             SeaTunnelRow seaTunnelRow = fakeRandomData.randomRow();
             output.collect(seaTunnelRow);
         }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
index 3b9f359ee..a34b2b7dd 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
@@ -17,9 +17,13 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
 
+import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
 
 import lombok.NonNull;
@@ -28,16 +32,29 @@ import org.apache.orc.CompressionKind;
 import org.apache.orc.OrcFile;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
+import org.apache.orc.storage.common.type.HiveDecimal;
 import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
 import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
 import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
 import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.charset.StandardCharsets;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.temporal.ChronoField;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class OrcWriteStrategy extends AbstractWriteStrategy {
@@ -109,37 +126,53 @@ public class OrcWriteStrategy extends AbstractWriteStrategy {
     }
 
     private TypeDescription buildFieldWithRowType(SeaTunnelDataType<?> type) {
-        if (BasicType.BOOLEAN_TYPE.equals(type)) {
-            return TypeDescription.createBoolean();
+        switch (type.getSqlType()) {
+            case ARRAY:
+                BasicType<?> elementType = ((ArrayType<?, ?>) type).getElementType();
+                return TypeDescription.createList(buildFieldWithRowType(elementType));
+            case MAP:
+                SeaTunnelDataType<?> keyType = ((MapType<?, ?>) type).getKeyType();
+                SeaTunnelDataType<?> valueType = ((MapType<?, ?>) type).getValueType();
+                return TypeDescription.createMap(buildFieldWithRowType(keyType), buildFieldWithRowType(valueType));
+            case STRING:
+                return TypeDescription.createString();
+            case BOOLEAN:
+                return TypeDescription.createBoolean();
+            case TINYINT:
+                return TypeDescription.createByte();
+            case SMALLINT:
+                return TypeDescription.createShort();
+            case INT:
+                return TypeDescription.createInt();
+            case BIGINT:
+                return TypeDescription.createLong();
+            case FLOAT:
+                return TypeDescription.createFloat();
+            case DOUBLE:
+                return TypeDescription.createDouble();
+            case DECIMAL:
+                int precision = ((DecimalType) type).getPrecision();
+                int scale = ((DecimalType) type).getScale();
+                return TypeDescription.createDecimal().withScale(scale).withPrecision(precision);
+            case BYTES:
+                return TypeDescription.createBinary();
+            case DATE:
+                return TypeDescription.createDate();
+            case TIME:
+            case TIMESTAMP:
+                return TypeDescription.createTimestamp();
+            case ROW:
+                TypeDescription struct = TypeDescription.createStruct();
+                SeaTunnelDataType<?>[] fieldTypes = ((SeaTunnelRowType) type).getFieldTypes();
+                for (int i = 0; i < fieldTypes.length; i++) {
+                    struct.addField(((SeaTunnelRowType) type).getFieldName(i), buildFieldWithRowType(fieldTypes[i]));
+                }
+                return struct;
+            case NULL:
+            default:
+                String errorMsg = String.format("Orc file not support this type [%s]", type.getSqlType());
+                throw new UnsupportedOperationException(errorMsg);
         }
-        if (BasicType.SHORT_TYPE.equals(type)) {
-            return TypeDescription.createShort();
-        }
-        if (BasicType.INT_TYPE.equals(type)) {
-            return TypeDescription.createInt();
-        }
-        if (BasicType.LONG_TYPE.equals(type)) {
-            return TypeDescription.createLong();
-        }
-        if (BasicType.FLOAT_TYPE.equals(type)) {
-            return TypeDescription.createFloat();
-        }
-        if (BasicType.DOUBLE_TYPE.equals(type)) {
-            return TypeDescription.createDouble();
-        }
-        if (BasicType.BYTE_TYPE.equals(type)) {
-            return TypeDescription.createByte();
-        }
-        if (BasicType.STRING_TYPE.equals(type)) {
-            return TypeDescription.createString();
-        }
-        if (BasicType.VOID_TYPE.equals(type)) {
-            return TypeDescription.createString();
-        }
-
-        // TODO map struct array
-
-        return TypeDescription.createString();
     }
 
     private TypeDescription buildSchemaWithRowType() {
@@ -169,9 +202,101 @@ public class OrcWriteStrategy extends AbstractWriteStrategy {
                     BytesColumnVector bytesColumnVector = (BytesColumnVector) vector;
                     setByteColumnVector(value, bytesColumnVector, row);
                     break;
+                case DECIMAL:
+                    DecimalColumnVector decimalColumnVector = (DecimalColumnVector) vector;
+                    setDecimalColumnVector(value, decimalColumnVector, row);
+                    break;
+                case TIMESTAMP:
+                    TimestampColumnVector timestampColumnVector = (TimestampColumnVector) vector;
+                    setTimestampColumnVector(value, timestampColumnVector, row);
+                    break;
+                case LIST:
+                    ListColumnVector listColumnVector = (ListColumnVector) vector;
+                    setListColumnVector(value, listColumnVector, row);
+                    break;
+                case MAP:
+                    MapColumnVector mapColumnVector = (MapColumnVector) vector;
+                    setMapColumnVector(value, mapColumnVector, row);
+                    break;
+                case STRUCT:
+                    StructColumnVector structColumnVector = (StructColumnVector) vector;
+                    setStructColumnVector(value, structColumnVector, row);
+                    break;
                 default:
-                    throw new RuntimeException("Unexpected ColumnVector subtype");
+                    throw new RuntimeException("Unexpected ColumnVector subtype " + vector.type);
+            }
+        }
+    }
+
+    private void setStructColumnVector(Object value, StructColumnVector structColumnVector, int row) {
+        if (value instanceof SeaTunnelRow) {
+            SeaTunnelRow seaTunnelRow = (SeaTunnelRow) value;
+            Object[] fields = seaTunnelRow.getFields();
+            for (int i = 0; i < fields.length; i++) {
+                setColumn(fields[i], structColumnVector.fields[i], row);
+            }
+        } else {
+            throw new RuntimeException("SeaTunnelRow type expected for field");
+        }
+
+    }
+
+    private void setMapColumnVector(Object value, MapColumnVector mapColumnVector, int row) {
+        if (value instanceof Map) {
+            Map<?, ?> map = (Map<?, ?>) value;
+
+            mapColumnVector.offsets[row] = mapColumnVector.childCount;
+            mapColumnVector.lengths[row] = map.size();
+            mapColumnVector.childCount += map.size();
+
+            int i = 0;
+            for (Map.Entry<?, ?> entry : map.entrySet()) {
+                int mapElem = (int) mapColumnVector.offsets[row] + i;
+                setColumn(entry.getKey(), mapColumnVector.keys, mapElem);
+                setColumn(entry.getValue(), mapColumnVector.values, mapElem);
+                ++i;
             }
+        } else {
+            throw new RuntimeException("Map type expected for field");
+        }
+    }
+
+    private void setListColumnVector(Object value, ListColumnVector listColumnVector, int row) {
+        Object[] valueArray;
+        if (value instanceof Object[]) {
+            valueArray = (Object[]) value;
+        } else if (value instanceof List) {
+            valueArray = ((List<?>) value).toArray();
+        } else {
+            throw new RuntimeException("List and Array type expected for field");
+        }
+        listColumnVector.offsets[row] = listColumnVector.childCount;
+        listColumnVector.lengths[row] = valueArray.length;
+        listColumnVector.childCount += valueArray.length;
+
+        for (int i = 0; i < valueArray.length; i++) {
+            int listElem = (int) listColumnVector.offsets[row] + i;
+            setColumn(valueArray[i], listColumnVector.child, listElem);
+        }
+    }
+
+    private void setDecimalColumnVector(Object value, DecimalColumnVector decimalColumnVector, int row) {
+        if (value instanceof BigDecimal) {
+            decimalColumnVector.set(row, HiveDecimal.create((BigDecimal) value));
+        } else {
+            throw new RuntimeException("BigDecimal type expected for field");
+        }
+    }
+
+    private void setTimestampColumnVector(Object value, TimestampColumnVector timestampColumnVector, int row) {
+        if (value instanceof Timestamp) {
+            timestampColumnVector.set(row, (Timestamp) value);
+        } else if (value instanceof LocalDateTime) {
+            timestampColumnVector.set(row, Timestamp.valueOf((LocalDateTime) value));
+        } else if (value instanceof LocalTime) {
+            timestampColumnVector.set(row, Timestamp.valueOf(((LocalTime) value).atDate(LocalDate.ofEpochDay(0))));
+        } else {
+            throw new RuntimeException("Time series type expected for field");
         }
     }
 
@@ -186,10 +311,12 @@ public class OrcWriteStrategy extends AbstractWriteStrategy {
         } else if (value instanceof BigInteger) {
             BigInteger bigInt = (BigInteger) value;
             longVector.vector[row] = bigInt.longValue();
-        } else if (value instanceof Short) {
-            longVector.vector[row] = (Short) value;
         } else if (value instanceof Byte) {
             longVector.vector[row] = (Byte) value;
+        } else if (value instanceof Short) {
+            longVector.vector[row] = (Short) value;
+        } else if (value instanceof LocalDate) {
+            longVector.vector[row] = ((LocalDate) value).getLong(ChronoField.EPOCH_DAY);
         } else {
             throw new RuntimeException("Long or Integer type expected for field");
         }
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
index 4b90f8cb3..2def75ba7 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
@@ -40,10 +40,16 @@ public class HttpSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
     protected final SerializationSchema serializationSchema;
 
     public HttpSinkWriter(SeaTunnelRowType seaTunnelRowType, HttpParameter httpParameter) {
+        this(seaTunnelRowType, httpParameter, new JsonSerializationSchema(seaTunnelRowType));
+    }
+
+    public HttpSinkWriter(SeaTunnelRowType seaTunnelRowType,
+                          HttpParameter httpParameter,
+                          SerializationSchema serializationSchema) {
         this.seaTunnelRowType = seaTunnelRowType;
         this.httpParameter = httpParameter;
         this.httpClient = new HttpClientProvider(httpParameter);
-        this.serializationSchema = new JsonSerializationSchema(seaTunnelRowType);
+        this.serializationSchema = serializationSchema;
     }
 
     @Override
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatBotMessageSerializationSchema.java b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatBotMessageSerializationSchema.java
new file mode 100644
index 000000000..13bb7ddce
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatBotMessageSerializationSchema.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.wechat.sink;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.wechat.sink.config.WeChatSinkConfig;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import lombok.SneakyThrows;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class WeChatBotMessageSerializationSchema implements SerializationSchema {
+    private final WeChatSinkConfig weChatSinkConfig;
+    private final SeaTunnelRowType rowType;
+    private final JsonSerializationSchema jsonSerializationSchema;
+
+    public WeChatBotMessageSerializationSchema(WeChatSinkConfig weChatSinkConfig,
+                                               SeaTunnelRowType rowType) {
+        this.weChatSinkConfig = weChatSinkConfig;
+        this.rowType = rowType;
+        this.jsonSerializationSchema = new JsonSerializationSchema(rowType);
+    }
+
+    @SneakyThrows
+    @Override
+    public byte[] serialize(SeaTunnelRow row) {
+        StringBuffer stringBuffer = new StringBuffer();
+        int totalFields = rowType.getTotalFields();
+        for (int i = 0; i < totalFields; i++) {
+            stringBuffer.append(rowType.getFieldName(i) + ": " + row.getField(i) + "\\n");
+        }
+        if (totalFields > 0) {
+            //remove last empty line
+            stringBuffer.delete(stringBuffer.length() - 2, stringBuffer.length());
+        }
+
+        HashMap<Object, Object> content = new HashMap<>();
+        content.put(WeChatSinkConfig.WECHAT_SEND_MSG_CONTENT_KEY, stringBuffer.toString());
+        if (!CollectionUtils.isEmpty(weChatSinkConfig.getMentionedList())) {
+            content.put(WeChatSinkConfig.MENTIONED_LIST, weChatSinkConfig.getMentionedList());
+        }
+        if (!CollectionUtils.isEmpty(weChatSinkConfig.getMentionedMobileList())) {
+            content.put(WeChatSinkConfig.MENTIONED_MOBILE_LIST, weChatSinkConfig.getMentionedMobileList());
+        }
+
+        Map<String, Object> wechatMessage = new HashMap<>();
+        wechatMessage.put(WeChatSinkConfig.WECHAT_SEND_MSG_TYPE_KEY, WeChatSinkConfig.WECHAT_SEND_MSG_SUPPORT_TYPE);
+        wechatMessage.put(WeChatSinkConfig.WECHAT_SEND_MSG_SUPPORT_TYPE, content);
+        return jsonSerializationSchema.getMapper().writeValueAsBytes(wechatMessage);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatHttpSinkWriter.java b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatHttpSinkWriter.java
deleted file mode 100644
index 44ecab013..000000000
--- a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatHttpSinkWriter.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.wechat.sink;
-
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
-import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSinkWriter;
-import org.apache.seatunnel.connectors.seatunnel.wechat.sink.config.WeChatSinkConfig;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.commons.collections4.CollectionUtils;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-public class WeChatHttpSinkWriter extends HttpSinkWriter {
-
-    private final WeChatSinkConfig weChatSinkConfig;
-    private final SeaTunnelRowType seaTunnelRowType;
-
-    public WeChatHttpSinkWriter(HttpParameter httpParameter,  Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
-        //new SeaTunnelRowType can match SeaTunnelRowWrapper fields sequence
-        super(new SeaTunnelRowType(new String[]{WeChatSinkConfig.WECHAT_SEND_MSG_TYPE_KEY, WeChatSinkConfig.WECHAT_SEND_MSG_SUPPORT_TYPE}, new SeaTunnelDataType[]{BasicType.VOID_TYPE, BasicType.VOID_TYPE}), httpParameter);
-        this.weChatSinkConfig = new WeChatSinkConfig(pluginConfig);
-        this.seaTunnelRowType = seaTunnelRowType;
-    }
-
-    @Override
-    public void write(SeaTunnelRow element) throws IOException {
-        StringBuffer stringBuffer = new StringBuffer();
-        int totalFields = seaTunnelRowType.getTotalFields();
-        for (int i = 0; i < totalFields; i++) {
-            stringBuffer.append(seaTunnelRowType.getFieldName(i) + ": " + element.getField(i) + "\\n");
-        }
-        if (totalFields > 0) {
-            //remove last empty line
-            stringBuffer.delete(stringBuffer.length() - 2, stringBuffer.length());
-        }
-        HashMap<Object, Object> objectMap = new HashMap<>();
-        objectMap.put(WeChatSinkConfig.WECHAT_SEND_MSG_CONTENT_KEY, stringBuffer.toString());
-        if (!CollectionUtils.isEmpty(weChatSinkConfig.getMentionedList())) {
-            objectMap.put(WeChatSinkConfig.MENTIONED_LIST, weChatSinkConfig.getMentionedList());
-        }
-        if (!CollectionUtils.isEmpty(weChatSinkConfig.getMentionedMobileList())) {
-            objectMap.put(WeChatSinkConfig.MENTIONED_MOBILE_LIST, weChatSinkConfig.getMentionedMobileList());
-        }
-        //SeaTunnelRowWrapper can used to post wechat web hook
-        SeaTunnelRow wechatRowWrapper = new SeaTunnelRow(new Object[]{WeChatSinkConfig.WECHAT_SEND_MSG_SUPPORT_TYPE, objectMap});
-        super.write(wechatRowWrapper);
-    }
-}
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
index 64b6d97b4..908fdcf10 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
+++ b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
@@ -17,15 +17,13 @@
 
 package org.apache.seatunnel.connectors.seatunnel.wechat.sink;
 
-import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSink;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.wechat.sink.config.WeChatSinkConfig;
 
 import com.google.auto.service.AutoService;
 
@@ -37,24 +35,9 @@ public class WeChatSink extends HttpSink {
         return "WeChat";
     }
 
-    private Config pluginConfig;
-
-    private SeaTunnelRowType seaTunnelRowType;
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        this.pluginConfig = pluginConfig;
-        super.prepare(pluginConfig);
-    }
-
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
-        super.setTypeInfo(seaTunnelRowType);
-    }
-
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) {
-        return new WeChatHttpSinkWriter(super.httpParameter, pluginConfig, seaTunnelRowType);
+        return new HttpSinkWriter(seaTunnelRowType, super.httpParameter,
+            new WeChatBotMessageSerializationSchema(new WeChatSinkConfig(pluginConfig), seaTunnelRowType));
     }
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
index f3b5368e9..71de5d176 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
@@ -22,6 +22,9 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceOptions;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.split.JdbcNumericBetweenParametersProvider;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSourceState;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -29,20 +32,34 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 public class JdbcSourceSplitEnumerator implements SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState> {
-
-    SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext;
-    List<JdbcSourceSplit> allSplit = new ArrayList<>();
-    JdbcSourceOptions jdbcSourceOptions;
-    PartitionParameter partitionParameter;
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceSplitEnumerator.class);
+    private final SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext;
+    private List<JdbcSourceSplit> allSplit = new ArrayList<>();
+    private JdbcSourceOptions jdbcSourceOptions;
+    private final PartitionParameter partitionParameter;
+    private final int parallelism;
 
     public JdbcSourceSplitEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext, JdbcSourceOptions jdbcSourceOptions, PartitionParameter partitionParameter) {
         this.enumeratorContext = enumeratorContext;
         this.jdbcSourceOptions = jdbcSourceOptions;
         this.partitionParameter = partitionParameter;
+        this.parallelism = enumeratorContext.currentParallelism();
     }
 
     @Override
     public void open() {
+        LOG.info("Starting to calculate splits.");
+        if (null != partitionParameter) {
+            JdbcNumericBetweenParametersProvider jdbcNumericBetweenParametersProvider =
+                    new JdbcNumericBetweenParametersProvider(partitionParameter.minValue, partitionParameter.maxValue).ofBatchNum(parallelism);
+            Serializable[][] parameterValues = jdbcNumericBetweenParametersProvider.getParameterValues();
+            for (int i = 0; i < parameterValues.length; i++) {
+                allSplit.add(new JdbcSourceSplit(parameterValues[i], i));
+            }
+        } else {
+            allSplit.add(new JdbcSourceSplit(null, 0));
+        }
+        LOG.info("Calculated splits successfully, the size of splits is {}.", allSplit.size());
     }
 
     @Override
@@ -70,20 +87,10 @@ public class JdbcSourceSplitEnumerator implements SourceSplitEnumerator<JdbcSour
 
     @Override
     public void registerReader(int subtaskId) {
-        int parallelism = enumeratorContext.currentParallelism();
-        if (allSplit.isEmpty()) {
-            if (null != partitionParameter) {
-                JdbcNumericBetweenParametersProvider jdbcNumericBetweenParametersProvider = new JdbcNumericBetweenParametersProvider(partitionParameter.minValue, partitionParameter.maxValue).ofBatchNum(parallelism);
-                Serializable[][] parameterValues = jdbcNumericBetweenParametersProvider.getParameterValues();
-                for (int i = 0; i < parameterValues.length; i++) {
-                    allSplit.add(new JdbcSourceSplit(parameterValues[i], i));
-                }
-            } else {
-                allSplit.add(new JdbcSourceSplit(null, 0));
-            }
-        }
         // Filter the split that the current task needs to run
-        List<JdbcSourceSplit> splits = allSplit.stream().filter(p -> p.splitId % parallelism == subtaskId).collect(Collectors.toList());
+        List<JdbcSourceSplit> splits = allSplit.stream()
+                .filter(p -> p.splitId % parallelism == subtaskId)
+                .collect(Collectors.toList());
         enumeratorContext.assignSplit(subtaskId, splits);
         enumeratorContext.signalNoMoreSplits(subtaskId);
     }
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index 1c37dfd77..ef92d4152 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
+
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
@@ -26,6 +29,9 @@ import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaAggregatedCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
@@ -50,6 +56,10 @@ public class KafkaSink implements SeaTunnelSink<SeaTunnelRow, KafkaSinkState, Ka
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, TOPIC, BOOTSTRAP_SERVERS);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg());
+        }
         this.pluginConfig = pluginConfig;
     }
 
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 3f71c3085..b577067ef 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX;
 
 import org.apache.seatunnel.api.sink.SinkWriter;
@@ -136,7 +137,7 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
 
     // todo: parse the target field from config
     private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
-        return new DefaultSeaTunnelRowSerializer(pluginConfig.getString("topics"), seaTunnelRowType);
+        return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC), seaTunnelRowType);
     }
 
     private KafkaSemantics getKafkaSemantics(Config pluginConfig) {
diff --git a/seatunnel-core/seatunnel-flink-starter/pom.xml b/seatunnel-core/seatunnel-flink-starter/pom.xml
index a34426b12..0a67115e4 100644
--- a/seatunnel-core/seatunnel-flink-starter/pom.xml
+++ b/seatunnel-core/seatunnel-flink-starter/pom.xml
@@ -48,6 +48,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-metrics-flink</artifactId>
+            <version>${revision}</version>
+        </dependency>
+
         <!-- todo: use another module to execute new API?        -->
         <dependency>
             <groupId>org.apache.flink</groupId>
diff --git a/seatunnel-core/seatunnel-spark-starter/pom.xml b/seatunnel-core/seatunnel-spark-starter/pom.xml
index 91dfbf939..309e07e09 100644
--- a/seatunnel-core/seatunnel-spark-starter/pom.xml
+++ b/seatunnel-core/seatunnel-spark-starter/pom.xml
@@ -47,6 +47,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-metrics-spark</artifactId>
+            <version>${revision}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-streaming_${scala.binary.version}</artifactId>
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
index 185ca71db..89be47204 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
@@ -173,5 +173,17 @@
             <outputDirectory>/connectors/spark</outputDirectory>
             <scope>provided</scope>
         </dependencySet>
+        <dependencySet>
+            <useProjectArtifact>false</useProjectArtifact>
+            <useTransitiveDependencies>true</useTransitiveDependencies>
+            <unpack>false</unpack>
+            <includes>
+                <include>org.apache.seatunnel:seatunnel-metrics-console:jar</include>
+                <include>org.apache.seatunnel:seatunnel-metrics-prometheus:jar</include>
+            </includes>
+            <outputFileNameMapping>${artifact.file.name}</outputFileNameMapping>
+            <outputDirectory>/option</outputDirectory>
+            <scope>provided</scope>
+        </dependencySet>
     </dependencySets>
 </assembly>
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin.xml b/seatunnel-dist/src/main/assembly/assembly-bin.xml
index 16f5aefe4..7c5a6e735 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin.xml
@@ -142,5 +142,17 @@
             <outputDirectory>/lib</outputDirectory>
             <scope>provided</scope>
         </dependencySet>
+        <dependencySet>
+            <useProjectArtifact>false</useProjectArtifact>
+            <useTransitiveDependencies>true</useTransitiveDependencies>
+            <unpack>false</unpack>
+            <includes>
+                <include>org.apache.seatunnel:seatunnel-metrics-console:jar</include>
+                <include>org.apache.seatunnel:seatunnel-metrics-prometheus:jar</include>
+            </includes>
+            <outputFileNameMapping>${artifact.file.name}</outputFileNameMapping>
+            <outputDirectory>/option</outputDirectory>
+            <scope>provided</scope>
+        </dependencySet>
     </dependencySets>
 </assembly>
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
index 15da3772b..9b628ba46 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
@@ -24,6 +24,7 @@ env {
 
 source {
     FakeSource {
+      row.num = 16
       result_table_name = "fake"
       schema = {
         fields {
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java
index b2535ebf2..5ade8cd82 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java
@@ -57,4 +57,12 @@ public class FakeSourceToFileIT extends SparkContainer {
         Container.ExecResult execResult = executeSeaTunnelSparkJob("/file/fakesource_to_local_json.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
     }
+
+    @Test
+    public void testFakeSourceToLocalFileORCAndReadToConsole() throws IOException, InterruptedException {
+        Container.ExecResult execResult = executeSeaTunnelSparkJob("/file/fakesource_to_local_orc.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Container.ExecResult execResult2 = executeSeaTunnelSparkJob("/file/local_orc_source_to_console.conf");
+        Assertions.assertEquals(0, execResult2.getExitCode());
+    }
 }
diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_local_orc.conf
similarity index 55%
copy from seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
copy to seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_local_orc.conf
index e04880dd2..9bdf41992 100644
--- a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_local_orc.conf
@@ -6,7 +6,7 @@
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
 #
-#     http://www.apache.org/licenses/LICENSE-2.0
+#    http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,28 +15,23 @@
 # limitations under the License.
 #
 
-######
-###### This config file is a demonstration of batch processing in SeaTunnel config
-######
-
 env {
   # You can set spark configuration here
-  # see available properties defined by spark: https://spark.apache.org/docs/latest/configuration.html#available-properties
-  #job.mode = BATCH
   spark.app.name = "SeaTunnel"
   spark.executor.instances = 2
   spark.executor.cores = 1
   spark.executor.memory = "1g"
   spark.master = local
+  job.mode = "BATCH"
 }
 
 source {
-  # This is a example input plugin **only for test and demonstrate the feature input plugin**
   FakeSource {
+    result_table_name = "fake"
     schema = {
       fields {
         c_map = "map<string, string>"
-        c_array = "array<int>"
+        c_array = "array<tinyint>"
         c_string = string
         c_boolean = boolean
         c_tinyint = tinyint
@@ -46,33 +41,20 @@ source {
         c_float = float
         c_double = double
         c_decimal = "decimal(30, 8)"
-        c_null = "null"
         c_bytes = bytes
         c_date = date
         c_timestamp = timestamp
       }
     }
-    result_table_name = "fake"
   }
 
-  # You can also use other input plugins, such as hdfs
-  # hdfs {
-  #   result_table_name = "accesslog"
-  #   path = "hdfs://hadoop-cluster-01/nginx/accesslog"
-  #   format = "json"
-  # }
-
-  # If you would like to get more information about how to configure seatunnel and see full list of input plugins,
-  # please go to https://seatunnel.apache.org/docs/category/source-v2
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
 }
 
 transform {
-  # split data by specific delimiter
-
-  # you can also use other transform plugins, such as sql
   sql {
-    sql = "select c_map,c_array,c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_null,c_bytes,c_date,c_timestamp from fake"
-    result_table_name = "sql"
+    sql = "select * from fake"
   }
 
   # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
@@ -80,15 +62,17 @@ transform {
 }
 
 sink {
-  # choose stdout output plugin to output data to console
-  Console {}
-
-  # you can also you other output plugins, such as sql
-  # hdfs {
-  #   path = "hdfs://hadoop-cluster-01/nginx/accesslog_processed"
-  #   save_mode = "append"
-  # }
+  LocalFile {
+    path="/tmp/test/orc/"
+    partition_by=["c_boolean"]
+    partition_dir_expression="${k0}=${v0}"
+    is_partition_field_write_in_file=true
+    file_name_expression="${transactionId}_${now}"
+    file_format="orc"
+    filename_time_format="yyyy.MM.dd"
+    is_enable_transaction=true
+  }
 
-  # If you would like to get more information about how to configure seatunnel and see full list of output plugins,
-  # please go to https://seatunnel.apache.org/docs/category/sink-v2
-}
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
+}
\ No newline at end of file
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/local_orc_source_to_console.conf
similarity index 61%
copy from seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
copy to seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/local_orc_source_to_console.conf
index 6a89b64a6..fe64d423d 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/local_orc_source_to_console.conf
@@ -14,46 +14,59 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-######
-###### This config file is a demonstration of streaming processing in seatunnel config
-######
 
 env {
-  # You can set flink configuration here
-  execution.parallelism = 1
-  #job.mode = "BATCH"
-  #execution.checkpoint.interval = 10000
-  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
 }
 
 source {
-  # This is a example source plugin **only for test and demonstrate the feature source plugin**
-    FakeSource {
-      result_table_name = "fake"
-      schema = {
-        fields {
-          name = "string"
-          age = "int"
-        }
-      }
-    }
+  LocalFile {
+    path = "/tmp/test/orc/"
+    type = "orc"
+  }
 
   # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
-  # please go to https://seatunnel.apache.org/docs/category/source-v2
+  # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
 }
 
 transform {
-    sql {
-      sql = "select name,age from fake"
-    }
+
 
   # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
   # please go to https://seatunnel.apache.org/docs/category/transform
 }
 
 sink {
-  Console {}
+  console {
+  }
 
+  Assert {
+    rules =
+      [{
+        field_name = c_string
+        field_type = string
+        field_value = [
+          {
+            rule_type = NOT_NULL
+          }
+        ]
+      }, {
+        field_name = c_boolean
+        field_type = boolean
+        field_value = [
+          {
+            rule_type = NOT_NULL
+          }
+        ]
+      }
+      ]
+  }
   # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
-  # please go to https://seatunnel.apache.org/docs/category/sink-v2
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
 }
\ No newline at end of file
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
index 6a89b64a6..5c7ad0d3a 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
@@ -21,15 +21,22 @@
 env {
   # You can set flink configuration here
   execution.parallelism = 1
-  #job.mode = "BATCH"
+  job.mode = "STREAMING"
   #execution.checkpoint.interval = 10000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+  seatunnel.metrics.class = org.apache.seatunnel.metrics.prometheus.PrometheusPushGatewayReporter
+  #seatunnel.metrics.class = org.apache.seatunnel.metrics.console.ConsoleLogReporter
+  seatunnel.metrics.interval = 10
+  seatunnel.metrics.host = localhost
+  seatunnel.metrics.port = 9091
+  seatunnel.metrics.jobName = flinkJob
 }
 
 source {
   # This is a example source plugin **only for test and demonstrate the feature source plugin**
     FakeSource {
       result_table_name = "fake"
+      row.num = 16
       schema = {
         fields {
           name = "string"
@@ -56,4 +63,4 @@ sink {
 
   # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
   # please go to https://seatunnel.apache.org/docs/category/sink-v2
-}
\ No newline at end of file
+}
diff --git a/seatunnel-examples/seatunnel-flink-examples/pom.xml b/seatunnel-examples/seatunnel-flink-examples/pom.xml
index e67cdc720..f91023997 100644
--- a/seatunnel-examples/seatunnel-flink-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-examples/pom.xml
@@ -61,6 +61,11 @@
             <artifactId>seatunnel-connector-flink-assert</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-metrics-flink</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <!--   seatunnel connectors   -->
 
         <!--flink-->
@@ -107,4 +112,4 @@
         </dependency>
     </dependencies>
 
-</project>
\ No newline at end of file
+</project>
diff --git a/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf b/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf
index 9c6ad1888..2b96ef423 100644
--- a/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf
+++ b/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf
@@ -23,6 +23,12 @@ env {
   execution.parallelism = 1
   #execution.checkpoint.interval = 10000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+  seatunnel.metrics.class = org.apache.seatunnel.metrics.prometheus.PrometheusPushGatewayReporter
+  #seatunnel.metrics.class = org.apache.seatunnel.metrics.console.ConsoleLogReporter
+  seatunnel.metrics.interval = 10
+  seatunnel.metrics.host = localhost
+  seatunnel.metrics.port = 9091
+  seatunnel.metrics.jobName = flinkJob
 }
 
 source {
@@ -50,4 +56,4 @@ sink {
 
   # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
   # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
-}
\ No newline at end of file
+}
diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
index e04880dd2..43d822001 100644
--- a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
+++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
@@ -28,11 +28,16 @@ env {
   spark.executor.cores = 1
   spark.executor.memory = "1g"
   spark.master = local
+  seatunnel.metrics.class = org.apache.seatunnel.metrics.prometheus.PrometheusPushGatewayReporter
+  seatunnel.metrics.host = localhost
+  seatunnel.metrics.port = 9091
+  seatunnel.metrics.jobName = sparkJob
 }
 
 source {
   # This is a example input plugin **only for test and demonstrate the feature input plugin**
   FakeSource {
+    row.num = 16
     schema = {
       fields {
         c_map = "map<string, string>"
diff --git a/seatunnel-examples/seatunnel-spark-examples/pom.xml b/seatunnel-examples/seatunnel-spark-examples/pom.xml
index 0254e3e92..dcb17c16a 100644
--- a/seatunnel-examples/seatunnel-spark-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-spark-examples/pom.xml
@@ -59,6 +59,11 @@
             <artifactId>connector-clickhouse</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-metrics-spark</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <!--   seatunnel connectors   -->
 
         <!--spark-->
@@ -100,4 +105,4 @@
             <artifactId>slf4j-log4j12</artifactId>
         </dependency>
     </dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/spark.batch.conf b/seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/spark.batch.conf
index b7c6aa485..c35ac47f3 100644
--- a/seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/spark.batch.conf
+++ b/seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/spark.batch.conf
@@ -27,6 +27,11 @@ env {
   spark.executor.cores = 1
   spark.executor.memory = "1g"
   spark.master = local
+  #seatunnel.metrics.class = org.apache.seatunnel.metrics.console.ConsoleLogReporter
+  seatunnel.metrics.class = org.apache.seatunnel.metrics.prometheus.PrometheusPushGatewayReporter
+  seatunnel.metrics.host = localhost
+  seatunnel.metrics.port = 9091
+  seatunnel.metrics.jobName = sparkJob
 }
 
 source {
diff --git a/seatunnel-metrics/pom.xml b/seatunnel-metrics/pom.xml
new file mode 100644
index 000000000..a5bc23719
--- /dev/null
+++ b/seatunnel-metrics/pom.xml
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-metrics</artifactId>
+    <packaging>pom</packaging>
+    <modules>
+        <module>seatunnel-metrics-core</module>
+        <module>seatunnel-metrics-flink</module>
+        <module>seatunnel-metrics-spark</module>
+        <module>seatunnel-metrics-prometheus</module>
+        <module>seatunnel-metrics-console</module>
+    </modules>
+</project>
diff --git a/seatunnel-metrics/seatunnel-metrics-console/pom.xml b/seatunnel-metrics/seatunnel-metrics-console/pom.xml
new file mode 100644
index 000000000..7e657dc67
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-console/pom.xml
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel-metrics</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-metrics-console</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-metrics-core</artifactId>
+            <version>${revision}</version>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git a/seatunnel-metrics/seatunnel-metrics-console/src/main/java/org/apache/seatunnel/metrics/console/ConsoleLogReporter.java b/seatunnel-metrics/seatunnel-metrics-console/src/main/java/org/apache/seatunnel/metrics/console/ConsoleLogReporter.java
new file mode 100644
index 000000000..7aa645bba
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-console/src/main/java/org/apache/seatunnel/metrics/console/ConsoleLogReporter.java
@@ -0,0 +1,115 @@
+package org.apache.seatunnel.metrics.console;
+
+import org.apache.seatunnel.metrics.core.Counter;
+import org.apache.seatunnel.metrics.core.Gauge;
+import org.apache.seatunnel.metrics.core.Histogram;
+import org.apache.seatunnel.metrics.core.Meter;
+import org.apache.seatunnel.metrics.core.MetricConfig;
+import org.apache.seatunnel.metrics.core.MetricInfo;
+import org.apache.seatunnel.metrics.core.reporter.MetricReporter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * A reporter which outputs measurements to log
+ */
+public class ConsoleLogReporter implements MetricReporter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ConsoleLogReporter.class);
+    private static final String LINE_SEPARATOR = System.lineSeparator();
+    private static final int DEFAULT_SIZE = 16384;
+    private int previousSize = DEFAULT_SIZE;
+
+    @Override
+    public void open(MetricConfig config) {
+        LOG.info("reporter open");
+    }
+
+    @Override
+    public void close() {
+        LOG.info("reporter close");
+    }
+
+    @Override
+    public void report(Map<Gauge, MetricInfo> gauges,
+                       Map<Counter, MetricInfo> counters,
+                       Map<Histogram, MetricInfo> histograms,
+                       Map<Meter, MetricInfo> meters) {
+        final double multiple = 1.1;
+        StringBuilder builder = new StringBuilder((int) (previousSize * multiple));
+
+        builder.append(LINE_SEPARATOR)
+            .append(
+                "=========================== Starting metrics report ===========================")
+            .append(LINE_SEPARATOR);
+
+        builder.append(LINE_SEPARATOR)
+            .append(
+                "-- Counters -------------------------------------------------------------------")
+            .append(LINE_SEPARATOR);
+
+        for (Map.Entry<Counter, MetricInfo> metric : counters.entrySet()) {
+            builder.append(metric.getValue().toString())
+                .append(metric.getKey().getMetricType().toString())
+                .append(": ")
+                .append(metric.getKey().getCount())
+                .append(LINE_SEPARATOR)
+                .append(LINE_SEPARATOR);
+
+        }
+
+        builder.append(LINE_SEPARATOR)
+            .append(
+                "-- Gauges -------------------------------------------------------------------")
+            .append(LINE_SEPARATOR);
+
+        for (Map.Entry<Gauge, MetricInfo> metric : gauges.entrySet()) {
+            builder.append(metric.getValue().toString())
+                .append(metric.getKey().getMetricType().toString())
+                .append(": ")
+                .append(metric.getKey().getValue())
+                .append(LINE_SEPARATOR)
+                .append(LINE_SEPARATOR);
+
+        }
+
+        builder.append(LINE_SEPARATOR)
+            .append(
+                "-- Meters -------------------------------------------------------------------")
+            .append(LINE_SEPARATOR);
+
+        for (Map.Entry<Meter, MetricInfo> metric : meters.entrySet()) {
+            builder.append(metric.getValue().toString())
+                .append(metric.getKey().getMetricType().toString())
+                .append(": ")
+                .append(metric.getKey().getRate())
+                .append(LINE_SEPARATOR)
+                .append(LINE_SEPARATOR);
+
+        }
+
+        builder.append(LINE_SEPARATOR)
+            .append(
+                "-- Histograms -------------------------------------------------------------------")
+            .append(LINE_SEPARATOR);
+
+        for (Map.Entry<Histogram, MetricInfo> metric : histograms.entrySet()) {
+            builder.append(metric.getValue().toString())
+                .append(metric.getKey().getMetricType().toString())
+                .append(LINE_SEPARATOR)
+                .append(metric.getKey().toString())
+                .append(LINE_SEPARATOR)
+                .append(LINE_SEPARATOR);
+
+        }
+
+        LOG.info(builder.toString());
+
+        previousSize = builder.length();
+
+    }
+
+}
diff --git a/seatunnel-metrics/seatunnel-metrics-core/pom.xml b/seatunnel-metrics/seatunnel-metrics-core/pom.xml
new file mode 100644
index 000000000..aa425423d
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-core/pom.xml
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel-metrics</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-metrics-core</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-compiler</artifactId>
+            <version>${scala.version}</version>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git a/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/Counter.java b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/Counter.java
new file mode 100644
index 000000000..ac0baf5d9
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/Counter.java
@@ -0,0 +1,18 @@
+package org.apache.seatunnel.metrics.core;
+
+/** A Counter is a {@link Metric} that measures a count. */
+public interface Counter extends Metric{
+    void inc();
+
+    void inc(long var1);
+
+    void dec();
+
+    void dec(long var1);
+
+    long getCount();
+
+    default MetricType getMetricType() {
+        return MetricType.COUNTER;
+    }
+}
diff --git a/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/Gauge.java b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/Gauge.java
new file mode 100644
index 000000000..3b2bb1e0e
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/Gauge.java
@@ -0,0 +1,10 @@
+package org.apache.seatunnel.metrics.core;
+
+/** A Gauge is a {@link Metric} that calculates a specific value at a point in time. */
+public interface Gauge<T> extends Metric{
+    T getValue();
+
+    default MetricType getMetricType() {
+        return MetricType.GAUGE;
+    }
+}
diff --git a/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/Histogram.java b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/Histogram.java
new file mode 100644
index 000000000..847011bd9
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/Histogram.java
@@ -0,0 +1,21 @@
+package org.apache.seatunnel.metrics.core;
+
+import java.util.Map;
+
+public interface Histogram extends Metric {
+    long getCount();
+
+    double getMin();
+
+    double getMax();
+
+    double getStdDev();
+
+    double getMean();
+
+    Map<Double, Double> getQuantile();
+
+    default MetricType getMetricType() {
+        return MetricType.HISTOGRAM;
+    }
+}
diff --git a/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/Meter.java b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/Meter.java
new file mode 100644
index 000000000..497a2b35f
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/Meter.java
@@ -0,0 +1,13 @@
+package org.apache.seatunnel.metrics.core;
+
+/** Metric for measuring throughput. */
+public interface Meter extends Metric{
+
+    double getRate();
+
+    long getCount();
+
+    default MetricType getMetricType() {
+        return MetricType.METER;
+    }
+}
diff --git a/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/Metric.java b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/Metric.java
new file mode 100644
index 000000000..72242a2f6
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/Metric.java
@@ -0,0 +1,6 @@
+package org.apache.seatunnel.metrics.core;
+
+/** Common super interface for all metrics. */
+public interface Metric {
+    MetricType getMetricType();
+}
diff --git a/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/MetricConfig.java b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/MetricConfig.java
new file mode 100644
index 000000000..018321a3c
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/MetricConfig.java
@@ -0,0 +1,50 @@
+package org.apache.seatunnel.metrics.core;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MetricConfig {
+    private String jobName;
+    private String host;
+    private int port;
+    private Map configs;
+
+    public MetricConfig() {
+        configs = new HashMap<String, String>();
+        jobName = "config";
+        host = "localhost";
+        port = 0;
+    }
+
+    public String getJobName() {
+        return jobName;
+    }
+
+    public void setJobName(String jobName) {
+        this.jobName = jobName;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    public Map getConfigs() {
+        return configs;
+    }
+
+    public void setConfigs(Map configs) {
+        this.configs = configs;
+    }
+}
diff --git a/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/MetricInfo.java b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/MetricInfo.java
new file mode 100644
index 000000000..bc51a4792
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/MetricInfo.java
@@ -0,0 +1,55 @@
+package org.apache.seatunnel.metrics.core;
+
+import java.util.List;
+
+/**
+ * Stores all child-properties of a metric.
+ */
+public class MetricInfo {
+    private String metricName;
+    private List<String> dimensionKeys;
+    private List<String> dimensionValues;
+    private String helpString;
+
+    public String getMetricName() {
+        return metricName;
+    }
+
+    public String getHelpString() {
+        return helpString;
+    }
+
+    public List<String> getDimensionKeys() {
+        return dimensionKeys;
+    }
+
+    public List<String> getDimensionValues() {
+        return dimensionValues;
+    }
+
+    public MetricInfo(String metricName, String helpString, List<String> dimensionKeys, List<String> dimensionValues) {
+        this.metricName = metricName;
+        this.helpString = helpString;
+        this.dimensionKeys = dimensionKeys;
+        this.dimensionValues = dimensionValues;
+    }
+
+    @Override
+    public String toString() {
+        String lineSeparator = System.lineSeparator();
+        StringBuilder builder = new StringBuilder();
+        builder.append("metricName: ")
+            .append(this.metricName)
+            .append(lineSeparator);
+        builder.append("helpString: ")
+            .append(this.helpString)
+            .append(lineSeparator);
+        for (int i = 0; i < this.dimensionKeys.size(); i++) {
+            builder.append(dimensionKeys.get(i))
+                .append(": ")
+                .append(dimensionValues.get(i))
+                .append(lineSeparator);
+        }
+        return builder.toString();
+    }
+}
diff --git a/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/MetricType.java b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/MetricType.java
new file mode 100644
index 000000000..b4f203e7b
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/MetricType.java
@@ -0,0 +1,9 @@
+package org.apache.seatunnel.metrics.core;
+
+/** Enum describing the different metric types. */
+public enum MetricType {
+    COUNTER,
+    METER,
+    GAUGE,
+    HISTOGRAM
+}
diff --git a/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/SimpleCounter.java b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/SimpleCounter.java
new file mode 100644
index 000000000..acd6f0c14
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/SimpleCounter.java
@@ -0,0 +1,29 @@
+package org.apache.seatunnel.metrics.core;
+
+public class SimpleCounter implements Counter {
+    private long count;
+
+    public SimpleCounter(long count) {
+        this.count = count;
+    }
+
+    public void inc() {
+        ++this.count;
+    }
+
+    public void inc(long n) {
+        this.count += n;
+    }
+
+    public void dec() {
+        --this.count;
+    }
+
+    public void dec(long n) {
+        this.count -= n;
+    }
+
+    public long getCount() {
+        return this.count;
+    }
+}
diff --git a/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/SimpleGauge.java b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/SimpleGauge.java
new file mode 100644
index 000000000..50c69864e
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/SimpleGauge.java
@@ -0,0 +1,15 @@
+package org.apache.seatunnel.metrics.core;
+
+public class SimpleGauge implements Gauge<Number> {
+    private Number value;
+
+    public SimpleGauge(Number value){
+        this.value = value;
+    }
+
+    @Override
+    public Number getValue() {
+        return this.value;
+    }
+
+}
diff --git a/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/SimpleHistogram.java b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/SimpleHistogram.java
new file mode 100644
index 000000000..b3ee29c77
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/SimpleHistogram.java
@@ -0,0 +1,73 @@
+package org.apache.seatunnel.metrics.core;
+
+import java.util.Map;
+
+public class SimpleHistogram implements Histogram{
+    private long count;
+    private double min;
+    private double max;
+    private double stdDev;
+    private double mean;
+    private Map<Double, Double> quantile;
+
+    public SimpleHistogram(long count, double min, double max, double stdDev, double mean, Map<Double, Double> quantile) {
+        this.count = count;
+        this.min = min;
+        this.max = max;
+        this.stdDev = stdDev;
+        this.mean = mean;
+        this.quantile = quantile;
+    }
+
+    @Override
+    public long getCount() {
+        return this.count;
+    }
+
+    @Override
+    public double getMin() {
+        return this.min;
+    }
+
+    @Override
+    public double getMax() {
+        return this.max;
+    }
+
+    @Override
+    public double getStdDev() {
+        return this.stdDev;
+    }
+
+    @Override
+    public double getMean() {
+        return this.mean;
+    }
+
+    @Override
+    public Map<Double, Double> getQuantile() {
+        return this.quantile;
+    }
+
+    @Override
+    public String toString() {
+        String lineSeparator = System.lineSeparator();
+        StringBuilder builder = new StringBuilder();
+        builder.append("count: ")
+                .append(count)
+                .append(lineSeparator)
+                .append("min: ")
+                .append(min)
+                .append(lineSeparator)
+                .append("max: ")
+                .append(max)
+                .append(lineSeparator)
+                .append("stdDev: ")
+                .append(stdDev)
+                .append(lineSeparator)
+                .append("mean: ")
+                .append(mean)
+                .append(lineSeparator);
+        return builder.toString();
+    }
+}
diff --git a/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/SimpleMeter.java b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/SimpleMeter.java
new file mode 100644
index 000000000..fb462848e
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/SimpleMeter.java
@@ -0,0 +1,22 @@
+package org.apache.seatunnel.metrics.core;
+
+public class SimpleMeter implements Meter{
+
+    private double rate;
+    private long count;
+
+    public SimpleMeter(double rate, long count){
+        this.rate = rate;
+        this.count = count;
+    }
+
+    @Override
+    public double getRate() {
+        return this.rate;
+    }
+
+    @Override
+    public long getCount() {
+        return this.count;
+    }
+}
diff --git a/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/reporter/MetricReporter.java b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/reporter/MetricReporter.java
new file mode 100644
index 000000000..295b3c82e
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/reporter/MetricReporter.java
@@ -0,0 +1,23 @@
+package org.apache.seatunnel.metrics.core.reporter;
+
+import org.apache.seatunnel.metrics.core.Counter;
+import org.apache.seatunnel.metrics.core.Gauge;
+import org.apache.seatunnel.metrics.core.Histogram;
+import org.apache.seatunnel.metrics.core.Meter;
+import org.apache.seatunnel.metrics.core.Metric;
+import org.apache.seatunnel.metrics.core.MetricConfig;
+import org.apache.seatunnel.metrics.core.MetricInfo;
+
+import java.util.Map;
+
+/** Reporters are used to export seatunnel {@link Metric Metrics} to an external backend. */
+public interface MetricReporter {
+    void open(MetricConfig config);
+
+    void close();
+
+    void report(Map<Gauge, MetricInfo> gauges,
+                Map<Counter, MetricInfo> counters,
+                Map<Histogram, MetricInfo> histograms,
+                Map<Meter, MetricInfo> meters);
+}
diff --git a/seatunnel-metrics/seatunnel-metrics-flink/pom.xml b/seatunnel-metrics/seatunnel-metrics-flink/pom.xml
new file mode 100644
index 000000000..80dc74724
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-flink/pom.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel-metrics</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-metrics-flink</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <version>${flink.1.13.6.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-metrics-core</artifactId>
+            <version>${revision}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-metrics-prometheus</artifactId>
+            <version>${revision}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-metrics-console</artifactId>
+            <version>${revision}</version>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git a/seatunnel-metrics/seatunnel-metrics-flink/src/main/java/org/apache/seatunnel/metrics/flink/AbstractSeatunnelReporter.java b/seatunnel-metrics/seatunnel-metrics-flink/src/main/java/org/apache/seatunnel/metrics/flink/AbstractSeatunnelReporter.java
new file mode 100644
index 000000000..0f6670f3c
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-flink/src/main/java/org/apache/seatunnel/metrics/flink/AbstractSeatunnelReporter.java
@@ -0,0 +1,106 @@
+package org.apache.seatunnel.metrics.flink;
+
+import org.apache.seatunnel.metrics.core.MetricInfo;
+
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/** base seatunnel reporter for flink metrics. */
+public abstract class AbstractSeatunnelReporter implements MetricReporter {
+
+    private static final Pattern UNALLOWED_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9:_]");
+    private static final CharacterFilter CHARACTER_FILTER = new CharacterFilter() {
+        public String filterCharacters(String input) {
+            return AbstractSeatunnelReporter.replaceInvalidChars(input);
+        }
+    };
+
+    private CharacterFilter labelValueCharactersFilter = CHARACTER_FILTER;
+
+    protected final Logger log = LoggerFactory.getLogger(this.getClass());
+    protected final Map<Gauge<?>, MetricInfo> gauges = new HashMap<>();
+    protected final Map<Counter, MetricInfo> counters = new HashMap<>();
+    protected final Map<Histogram, MetricInfo> histograms = new HashMap<>();
+    protected final Map<Meter, MetricInfo> meters = new HashMap<>();
+
+    public AbstractSeatunnelReporter() {
+    }
+
+    public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
+        String scopedMetricName = getScopedName(metricName, group);
+        String helpString = metricName + " (scope: " + getLogicalScope(group) + ")";
+        List<String> dimensionKeys = new LinkedList<>();
+        List<String> dimensionValues = new LinkedList<>();
+        for (final Map.Entry<String, String> dimension : group.getAllVariables().entrySet()) {
+            final String key = dimension.getKey();
+            dimensionKeys.add(
+                    CHARACTER_FILTER.filterCharacters(key.substring(1, key.length() - 1)));
+            dimensionValues.add(labelValueCharactersFilter.filterCharacters(dimension.getValue()));
+        }
+        String metricNameFin = scopedMetricName;
+        if (dimensionKeys.size() > 0 && dimensionKeys.get(dimensionKeys.size() - 1).equals("subtask_index")) {
+            metricNameFin = scopedMetricName + "_" + dimensionValues.get(dimensionValues.size() - 1);
+        }
+        MetricInfo metricInfo = new MetricInfo(metricNameFin, helpString, dimensionKeys, dimensionValues);
+        synchronized (this) {
+            if (metric instanceof Counter) {
+                this.counters.put((Counter) metric, metricInfo);
+            } else if (metric instanceof Gauge) {
+                this.gauges.put((Gauge) metric, metricInfo);
+            } else if (metric instanceof Histogram) {
+                this.histograms.put((Histogram) metric, metricInfo);
+            } else if (metric instanceof Meter) {
+                this.meters.put((Meter) metric, metricInfo);
+            } else {
+                this.log.warn("Cannot add unknown metric type {}. This indicates that the reporter does not support this metric type.", metric.getClass().getName());
+            }
+
+        }
+    }
+
+    public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
+        synchronized (this) {
+            if (metric instanceof Counter) {
+                this.counters.remove(metric);
+            } else if (metric instanceof Gauge) {
+                this.gauges.remove(metric);
+            } else if (metric instanceof Histogram) {
+                this.histograms.remove(metric);
+            } else if (metric instanceof Meter) {
+                this.meters.remove(metric);
+            } else {
+                this.log.warn("Cannot remove unknown metric type {}. This indicates that the reporter does not support this metric type.", metric.getClass().getName());
+            }
+
+        }
+    }
+
+    private static String getScopedName(String metricName, MetricGroup group) {
+        return "seatunnel_" + getLogicalScope(group) + '_' + CHARACTER_FILTER.filterCharacters(metricName);
+    }
+
+    private static String getLogicalScope(MetricGroup group) {
+        return ((FrontMetricGroup<AbstractMetricGroup<?>>) group)
+                .getLogicalScope(CHARACTER_FILTER, '_');
+    }
+
+    static String replaceInvalidChars(String input) {
+        return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
+    }
+}
diff --git a/seatunnel-metrics/seatunnel-metrics-flink/src/main/java/org/apache/seatunnel/metrics/flink/SeatunnelMetricReporter.java b/seatunnel-metrics/seatunnel-metrics-flink/src/main/java/org/apache/seatunnel/metrics/flink/SeatunnelMetricReporter.java
new file mode 100644
index 000000000..51168c4bb
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-flink/src/main/java/org/apache/seatunnel/metrics/flink/SeatunnelMetricReporter.java
@@ -0,0 +1,123 @@
+package org.apache.seatunnel.metrics.flink;
+
+import org.apache.seatunnel.metrics.core.Counter;
+import org.apache.seatunnel.metrics.core.Gauge;
+import org.apache.seatunnel.metrics.core.Histogram;
+import org.apache.seatunnel.metrics.core.Meter;
+import org.apache.seatunnel.metrics.core.MetricInfo;
+import org.apache.seatunnel.metrics.core.SimpleCounter;
+import org.apache.seatunnel.metrics.core.SimpleGauge;
+import org.apache.seatunnel.metrics.core.SimpleHistogram;
+import org.apache.seatunnel.metrics.core.SimpleMeter;
+import org.apache.seatunnel.metrics.core.reporter.MetricReporter;
+
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * exports Flink metrics to Seatunnel
+ */
+public class SeatunnelMetricReporter extends AbstractSeatunnelReporter implements Scheduled {
+    private final Logger log = LoggerFactory.getLogger(SeatunnelMetricReporter.class);
+    private MetricReporter reporter;
+    private String host;
+    private int port;
+    private String jobName;
+    private String className;
+    private static final int DEFAULT_PORT = 9091;
+
+    @Override
+    public void open(MetricConfig metricConfig) {
+        MetricConfig config = metricConfig;
+        config.isEmpty();
+        host = config.getString("host", "localhost");
+        port = config.getInteger("port", DEFAULT_PORT);
+        jobName = config.getString("jobName", "flinkJob");
+        className = config.getString("reporterName", "org.apache.seatunnel.metrics.console.ConsoleLogReporter");
+    }
+
+    @Override
+    public void close() {
+        log.info("StreamMetricReporter close");
+    }
+
+    @Override
+    public void report() {
+        log.info("reporter report");
+        HashMap<Counter, MetricInfo> countersIndex = new HashMap<>();
+        HashMap<Gauge, MetricInfo> gaugesIndex = new HashMap<>();
+        HashMap<Histogram, MetricInfo> histogramsIndex = new HashMap<>();
+        HashMap<Meter, MetricInfo> metersIndex = new HashMap<>();
+
+        HashSet<String> name = new HashSet<>();
+
+        //Convert flink metrics to seatunnel
+        for (Map.Entry<org.apache.flink.metrics.Counter, MetricInfo> metric : counters.entrySet()) {
+            //Skip processing on a repeat
+            if (name.contains(metric.getValue().getMetricName())) {
+                continue;
+            }
+            name.add(metric.getValue().getMetricName());
+            countersIndex.put(new SimpleCounter(metric.getKey().getCount()), metric.getValue());
+        }
+
+        name.clear();
+        for (Map.Entry<org.apache.flink.metrics.Gauge<?>, MetricInfo> metric : gauges.entrySet()) {
+            //Skip processing on a repeat
+            if (name.contains(metric.getValue().getMetricName())) {
+                continue;
+            }
+            name.add(metric.getValue().getMetricName());
+            Object num = metric.getKey().getValue();
+            if (num instanceof Number) {
+                gaugesIndex.put(new SimpleGauge((Number) num), metric.getValue());
+            }
+        }
+
+        name.clear();
+        for (Map.Entry<org.apache.flink.metrics.Meter, MetricInfo> metric : meters.entrySet()) {
+            //Skip processing on a repeat
+            if (name.contains(metric.getValue().getMetricName())) {
+                continue;
+            }
+            name.add(metric.getValue().getMetricName());
+            metersIndex.put(new SimpleMeter(metric.getKey().getRate(), metric.getKey().getCount()), metric.getValue());
+
+        }
+        final double quantile05 = 0.5;
+        final double quantile75 = 0.75;
+        final double quantile95 = 0.95;
+        //todo histogram
+        for (Map.Entry<org.apache.flink.metrics.Histogram, MetricInfo> metric : histograms.entrySet()) {
+            org.apache.flink.metrics.Histogram key = metric.getKey();
+            HashMap<Double, Double> quantile = new HashMap<>();
+            quantile.put(quantile05, key.getStatistics().getQuantile(quantile05));
+            quantile.put(quantile75, key.getStatistics().getQuantile(quantile75));
+            quantile.put(quantile95, key.getStatistics().getQuantile(quantile95));
+            histogramsIndex.put(new SimpleHistogram(key.getCount(), key.getStatistics().getMin(), key.getStatistics().getMax(), key.getStatistics().getStdDev(), key.getStatistics().getMean(), quantile), metric.getValue());
+        }
+        //todo handle user config
+        try {
+            //ClassLoader classLoader = PrometheusPushGatewayReporter.class.getClassLoader();
+            Class<?> aClass = Class.forName(className);
+            reporter = (MetricReporter) aClass.newInstance();
+            org.apache.seatunnel.metrics.core.MetricConfig config = new org.apache.seatunnel.metrics.core.MetricConfig();
+            config.setJobName(jobName);
+            config.setHost(host);
+            config.setPort(port);
+            reporter.open(config);
+            reporter.report(gaugesIndex, countersIndex, histogramsIndex, metersIndex);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+}
+
diff --git a/seatunnel-metrics/seatunnel-metrics-prometheus/pom.xml b/seatunnel-metrics/seatunnel-metrics-prometheus/pom.xml
new file mode 100644
index 000000000..7a97f70b4
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-prometheus/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel-metrics</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-metrics-prometheus</artifactId>
+    <properties>
+        <prometheus.version>0.9.0</prometheus.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>io.prometheus</groupId>
+            <artifactId>simpleclient_pushgateway</artifactId>
+            <version>${prometheus.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-metrics-core</artifactId>
+            <version>${revision}</version>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git a/seatunnel-metrics/seatunnel-metrics-prometheus/src/main/java/org/apache/seatunnel/metrics/prometheus/PrometheusPushGatewayReporter.java b/seatunnel-metrics/seatunnel-metrics-prometheus/src/main/java/org/apache/seatunnel/metrics/prometheus/PrometheusPushGatewayReporter.java
new file mode 100644
index 000000000..8e00f1c01
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-prometheus/src/main/java/org/apache/seatunnel/metrics/prometheus/PrometheusPushGatewayReporter.java
@@ -0,0 +1,230 @@
+package org.apache.seatunnel.metrics.prometheus;
+
+import org.apache.seatunnel.metrics.core.Counter;
+import org.apache.seatunnel.metrics.core.Gauge;
+import org.apache.seatunnel.metrics.core.Histogram;
+import org.apache.seatunnel.metrics.core.Meter;
+import org.apache.seatunnel.metrics.core.Metric;
+import org.apache.seatunnel.metrics.core.MetricConfig;
+import org.apache.seatunnel.metrics.core.MetricInfo;
+import org.apache.seatunnel.metrics.core.reporter.MetricReporter;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A reporter which outputs measurements to PrometheusPushGateway
+ */
+public class PrometheusPushGatewayReporter implements MetricReporter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PrometheusPushGatewayReporter.class);
+    URL hostUrl;
+    private PushGateway pushGateway;
+    private String jobName;
+    private static final int DEFAULT_PORT = 9091;
+    @Override
+    public void open(MetricConfig config) {
+        String url = "";
+        if (isNullOrWhitespaceOnly(config.getHost()) || config.getPort() < 1) {
+            throw new IllegalArgumentException(
+                    "Invalid host/port configuration. Host: " + config.getHost() + " Port: " + config.getPort());
+        } else {
+            url = "http://" + config.getHost() + ":" + config.getPort();
+        }
+
+        this.jobName = config.getJobName();
+        try {
+            this.hostUrl = new URL(url);
+        } catch (MalformedURLException e) {
+            throw new RuntimeException(e);
+        }
+        this.pushGateway = new PushGateway(hostUrl);
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void report(Map<Gauge, MetricInfo> gauges,
+                       Map<Counter, MetricInfo> counters,
+                       Map<Histogram, MetricInfo> histograms,
+                       Map<Meter, MetricInfo> meters) {
+        Collector collector;
+        CollectorRegistry registry = new CollectorRegistry();
+        for (Map.Entry<Counter, MetricInfo> metric : counters.entrySet()) {
+            MetricInfo metricInfo = metric.getValue();
+            collector = createCollector(metric.getKey(), metricInfo.getMetricName(), metricInfo.getHelpString(), metricInfo.getDimensionKeys(), metricInfo.getDimensionValues());
+            try {
+                collector.register(registry);
+            } catch (Exception e) {
+                LOG.warn("There was a problem registering metric {}.", metric.getValue().toString(), e);
+            }
+            addMetric(metric.getKey(), metricInfo.getDimensionValues(), collector);
+        }
+
+        for (Map.Entry<Gauge, MetricInfo> metric : gauges.entrySet()) {
+            MetricInfo metricInfo = metric.getValue();
+            collector = createCollector(metric.getKey(), metricInfo.getMetricName(), metricInfo.getHelpString(), metricInfo.getDimensionKeys(), metricInfo.getDimensionValues());
+            try {
+                collector.register(registry);
+            } catch (Exception e) {
+                LOG.warn("There was a problem registering metric {}.", metric.getValue().toString(), e);
+            }
+            addMetric(metric.getKey(), metricInfo.getDimensionValues(), collector);
+        }
+
+        //todo:add histogram
+
+        for (Map.Entry<Meter, MetricInfo> metric : meters.entrySet()) {
+            MetricInfo metricInfo = metric.getValue();
+            collector = createCollector(metric.getKey(), metricInfo.getMetricName(), metricInfo.getHelpString(), metricInfo.getDimensionKeys(), metricInfo.getDimensionValues());
+            try {
+                collector.register(registry);
+            } catch (Exception e) {
+                LOG.warn("There was a problem registering metric {}.", metric.getValue().toString(), e);
+            }
+            addMetric(metric.getKey(), metricInfo.getDimensionValues(), collector);
+        }
+
+        try {
+            pushGateway.pushAdd(registry, jobName);
+        } catch (Exception e) {
+            LOG.warn(
+                    "Failed to push metrics to PushGateway with jobName {}.",
+                    jobName,
+                    e);
+        }
+    }
+
+    private Collector createCollector(Metric metric,
+                                      String metricName,
+                                      String helpString,
+                                      List<String> dimensionKeys,
+                                      List<String> dimensionValues) {
+        Collector collector;
+        switch (metric.getMetricType()) {
+            case GAUGE:
+            case COUNTER:
+            case METER:
+                collector =
+                        io.prometheus.client.Gauge.build()
+                                .name(metricName)
+                                .help(helpString)
+                                .labelNames(toArray(dimensionKeys))
+                                .create();
+                break;
+            case HISTOGRAM:
+                collector =
+                        io.prometheus.client.Histogram.build()
+                                .name(metricName)
+                                .labelNames(toArray(dimensionKeys))
+                                .create();
+                break;
+            default:
+                LOG.warn(
+                        "Cannot create collector for unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
+                        metric.getClass().getName());
+                collector = null;
+        }
+
+        return collector;
+    }
+
+    private void addMetric(Metric metric, List<String> dimensionValues, Collector collector) {
+        switch (metric.getMetricType()) {
+            case GAUGE:
+                ((io.prometheus.client.Gauge) collector)
+                        .setChild(gaugeFrom((Gauge<?>) metric), toArray(dimensionValues));
+                break;
+            case COUNTER:
+                ((io.prometheus.client.Gauge) collector)
+                        .setChild(gaugeFrom((Counter) metric), toArray(dimensionValues));
+                break;
+            case METER:
+                ((io.prometheus.client.Gauge) collector)
+                        .setChild(gaugeFrom((Meter) metric), toArray(dimensionValues));
+                break;
+            case HISTOGRAM:
+                // todo
+                LOG.error("to do");
+                break;
+            default:
+                LOG.warn(
+                        "Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
+                        metric.getClass().getName());
+        }
+    }
+
+    private static io.prometheus.client.Gauge.Child gaugeFrom(Gauge<?> gauge) {
+        return new io.prometheus.client.Gauge.Child() {
+            @Override
+            public double get() {
+                final Object value = gauge.getValue();
+                if (value == null) {
+                    LOG.debug("Gauge {} is null-valued, defaulting to 0.", gauge);
+                    return 0;
+                }
+                if (value instanceof Double) {
+                    return (double) value;
+                }
+                if (value instanceof Number) {
+                    return ((Number) value).doubleValue();
+                }
+                if (value instanceof Boolean) {
+                    return ((Boolean) value) ? 1 : 0;
+                }
+                LOG.debug(
+                        "Invalid type for Gauge {}: {}, only number types and booleans are supported by this reporter.",
+                        gauge,
+                        value.getClass().getName());
+                return 0;
+            }
+        };
+    }
+
+    private static io.prometheus.client.Gauge.Child gaugeFrom(Counter counter) {
+        return new io.prometheus.client.Gauge.Child() {
+            @Override
+            public double get() {
+                return (double) counter.getCount();
+            }
+        };
+    }
+
+    private static io.prometheus.client.Gauge.Child gaugeFrom(Meter meter) {
+        return new io.prometheus.client.Gauge.Child() {
+            @Override
+            public double get() {
+                return meter.getRate();
+            }
+        };
+    }
+
+    private static String[] toArray(List<String> list) {
+        return list.toArray(new String[list.size()]);
+    }
+
+    public static boolean isNullOrWhitespaceOnly(String str) {
+        if (str == null || str.length() == 0) {
+            return true;
+        }
+
+        final int len = str.length();
+        for (int i = 0; i < len; i++) {
+            if (!Character.isWhitespace(str.charAt(i))) {
+                return false;
+            }
+        }
+        return true;
+    }
+}
diff --git a/seatunnel-metrics/seatunnel-metrics-spark/pom.xml b/seatunnel-metrics/seatunnel-metrics-spark/pom.xml
new file mode 100644
index 000000000..634281ecc
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-spark/pom.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel-metrics</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-metrics-spark</artifactId>
+
+    <dependencies>
+        <!--  spark. -->
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${spark.2.4.0.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-metrics-core</artifactId>
+            <version>${revision}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-metrics-prometheus</artifactId>
+            <version>${revision}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-metrics-console</artifactId>
+            <version>${revision}</version>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git a/seatunnel-metrics/seatunnel-metrics-spark/src/main/java/org/apache/seatunnel/metrics/spark/SeatunnelMetricSink.scala b/seatunnel-metrics/seatunnel-metrics-spark/src/main/java/org/apache/seatunnel/metrics/spark/SeatunnelMetricSink.scala
new file mode 100644
index 000000000..8f7c4dca5
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-spark/src/main/java/org/apache/seatunnel/metrics/spark/SeatunnelMetricSink.scala
@@ -0,0 +1,241 @@
+package org.apache.seatunnel.metrics.spark
+
+import java.util
+import java.util.{Locale, Properties}
+import java.util.concurrent.TimeUnit
+import scala.collection.JavaConversions._
+import com.codahale.metrics
+import com.codahale.metrics.{Counter, Histogram, Meter, _}
+import org.apache.seatunnel.metrics.core.reporter.MetricReporter
+import org.apache.seatunnel.metrics.core.{Gauge, _}
+import org.apache.seatunnel.metrics.prometheus.PrometheusPushGatewayReporter
+import org.apache.spark.internal.Logging
+
+object SeatunnelMetricSink {
+  trait SinkConfig extends Serializable {
+    def metricsNamespace: Option[String]
+
+    def sparkAppId: Option[String]
+
+    def sparkAppName: Option[String]
+
+    def executorId: Option[String]
+  }
+}
+
+abstract class SeatunnelMetricSink(
+    property: Properties,
+    registry: MetricRegistry,
+    sinkConfig: SeatunnelMetricSink.SinkConfig) extends Logging {
+
+  import sinkConfig._
+
+  protected class SeatunnelMetricReporter(registry: MetricRegistry, metricFilter: MetricFilter)
+    extends ScheduledReporter(
+      registry,
+      "seatunnel-reporter",
+      metricFilter,
+      TimeUnit.SECONDS,
+      TimeUnit.MILLISECONDS) {
+
+    override def report(
+        gauges: util.SortedMap[String, metrics.Gauge[_]],
+        counters: util.SortedMap[String, Counter],
+        histograms: util.SortedMap[String, Histogram],
+        meters: util.SortedMap[String, Meter],
+        timers: util.SortedMap[String, Timer]) = {
+
+      val role: String = (sparkAppId, executorId) match {
+        case (Some(_), Some("driver")) | (Some(_), Some("<driver>")) => "driver"
+        case (Some(_), Some(_)) => "executor"
+        case _ => "unknown"
+      }
+
+      val job: String = role match {
+        case "driver" => metricsNamespace.getOrElse(sparkAppId.get)
+        case "executor" => metricsNamespace.getOrElse(sparkAppId.get)
+        case _ => metricsNamespace.getOrElse("unknown")
+      }
+
+      // val instance: String = "instance"
+      val appName: String = sparkAppName.getOrElse("")
+
+      logInfo(s"role=$role, job=$job")
+
+      val dimensionKeys = new util.LinkedList[String]()
+      val dimensionValues = new util.LinkedList[String]()
+      dimensionKeys.add("job_name")
+      dimensionValues.add(appName)
+      dimensionKeys.add("job_id")
+      dimensionValues.add(job)
+      dimensionKeys.add("role")
+      dimensionValues.add(role)
+
+      val countersIndex = new util.HashMap[org.apache.seatunnel.metrics.core.Counter, MetricInfo]
+      val gaugesIndex = new util.HashMap[org.apache.seatunnel.metrics.core.Gauge[_], MetricInfo]
+      val histogramsIndex =
+        new util.HashMap[org.apache.seatunnel.metrics.core.Histogram, MetricInfo]
+      val metersIndex = new util.HashMap[org.apache.seatunnel.metrics.core.Meter, MetricInfo]
+
+      for (metricName <- gauges.keySet()) {
+        val metric = gauges.get(metricName)
+        val num = numeric(metric.getValue)
+        if (num.toString != Long.MaxValue.toString) {
+          gaugesIndex.put(
+            new SimpleGauge(num),
+            newMetricInfo(metricName, dimensionKeys, dimensionValues))
+        } else {
+          logError(metricName + " is not a number ")
+        }
+      }
+
+      for (metricName <- counters.keySet()) {
+        val metric = counters.get(metricName)
+        countersIndex.put(
+          new SimpleCounter(metric.getCount),
+          newMetricInfo(metricName, dimensionKeys, dimensionValues))
+      }
+
+      for (metricName <- meters.keySet()) {
+        val metric = meters.get(metricName)
+        metersIndex.put(
+          new SimpleMeter(metric.getMeanRate, metric.getCount),
+          newMetricInfo(metricName, dimensionKeys, dimensionValues))
+      }
+
+      for (metricName <- histograms.keySet()) {
+        val metric = histograms.get(metricName)
+        histogramsIndex.put(
+          new SimpleHistogram(
+            metric.getCount,
+            metric.getSnapshot.getMin,
+            metric.getSnapshot.getMax,
+            metric.getSnapshot.getStdDev,
+            metric.getSnapshot.getMean,
+            new util.HashMap[java.lang.Double, java.lang.Double]() {
+              0.75 -> metric.getSnapshot.get75thPercentile();
+              0.95 -> metric.getSnapshot.get95thPercentile();
+              0.99 -> metric.getSnapshot.get99thPercentile()
+            }),
+          newMetricInfo(metricName, dimensionKeys, dimensionValues))
+      }
+//      val reporter = new PrometheusPushGatewayReporter
+//      val config = new MetricConfig
+//      config.setJobName(pollJobName)
+//      config.setHost(pollHost)
+//      config.setPort(pollPort)
+//      reporter.open(config)
+//      reporter.report(gaugesIndex, countersIndex, histogramsIndex, metersIndex)
+      try {
+        val aClass = Class.forName(pollReporter)
+        val reporter = aClass.newInstance.asInstanceOf[MetricReporter]
+        val config = new MetricConfig
+        config.setJobName(pollJobName)
+        config.setHost(pollHost)
+        config.setPort(pollPort)
+        reporter.open(config)
+        reporter.report(gaugesIndex, countersIndex, histogramsIndex, metersIndex)
+      } catch {
+        case e: Exception =>
+          throw new RuntimeException(e)
+      }
+    }
+
+  }
+
+  val CONSOLE_DEFAULT_PERIOD = 10
+  val CONSOLE_DEFAULT_UNIT = "SECONDS"
+  val CONSOLE_DEFAULT_HOST = "localhost"
+  val CONSOLE_DEFAULT_PORT = 9091
+  val CONSOLE_DEFAULT_JOB_NAME = "sparkJob"
+  val CONSOLE_DEFAULT_REPORTER_NAME = "org.apache.seatunnel.metrics.console.ConsoleLogReporter"
+
+  val CONSOLE_KEY_INTERVAL = "interval"
+  val CONSOLE_KEY_UNIT = "unit"
+  val CONSOLE_KEY_HOST = "host"
+  val CONSOLE_KEY_PORT = "port"
+  val CONSOLE_KEY_JOB_NAME = "jobName"
+  val CONSOLE_KEY_REPORTER_NAME = "reporterName"
+
+  val KEY_RE_METRICS_FILTER = "metrics-filter-([a-zA-Z][a-zA-Z0-9-]*)".r
+
+  val pollPeriod: Long = Option(property.getProperty(CONSOLE_KEY_INTERVAL)) match {
+    case Some(s) => s.toInt
+    case None => CONSOLE_DEFAULT_PERIOD
+  }
+
+  val pollUnit: TimeUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
+    case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
+    case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
+  }
+
+  val pollHost: String = Option(property.getProperty(CONSOLE_KEY_HOST)) match {
+    case Some(s) => s
+    case None => CONSOLE_DEFAULT_HOST
+  }
+
+  val pollPort: Int = Option(property.getProperty(CONSOLE_KEY_PORT)) match {
+    case Some(s) => s.toInt
+    case None => CONSOLE_DEFAULT_PORT
+  }
+
+  val pollJobName: String = Option(property.getProperty(CONSOLE_KEY_JOB_NAME)) match {
+    case Some(s) => s
+    case None => CONSOLE_DEFAULT_JOB_NAME
+  }
+
+  val pollReporter: String = Option(property.getProperty(CONSOLE_KEY_REPORTER_NAME)) match {
+    case Some(s) => s
+    case None => CONSOLE_DEFAULT_REPORTER_NAME
+  }
+
+  val metricsFilter: MetricFilter = MetricFilter.ALL
+
+  val seatunnelReporter = new SeatunnelMetricReporter(registry, metricsFilter)
+
+  def start(): Unit = {
+    seatunnelReporter.start(pollPeriod, pollUnit)
+  }
+
+  def stop(): Unit = {
+    seatunnelReporter.stop()
+  }
+
+  def report(): Unit = {
+    seatunnelReporter.report()
+  }
+
+  private def numeric(a: Any): Number = {
+    // val NumericString = Array("double","Double", "float","Float", "int","Int", "long", "Long", "short","Short")
+    a.getClass.getSimpleName match {
+      case "Integer" => a.toString.toInt
+      case "Double" => a.toString.toDouble
+      case "Float" => a.toString.toFloat
+      case "Long" => a.toString.toLong
+      case "Short" => a.toString.toShort
+      case _ => Long.MaxValue
+    }
+  }
+
+  private def newMetricInfo(
+      info: String,
+      dimensionKeys: util.LinkedList[String],
+      dimensionValues: util.LinkedList[String]): MetricInfo = {
+    val proInfo = info.replace("-", "_")
+    val infos = proInfo.split("\\.")
+
+    var metricName = infos.drop(1).map(str => {
+      str + "_"
+    }).mkString("")
+    metricName = metricName.dropRight(1)
+    val seatunnelMetricName = "seatunnel_" + metricName
+
+    // dimensionKeys.add("sourceName")
+    // dimensionValues.add(infos.apply(2))
+
+    val helpString = infos.apply(2) + "(scope:" + metricName + ")"
+
+    new MetricInfo(seatunnelMetricName, helpString, dimensionKeys, dimensionValues)
+  }
+
+}
diff --git a/seatunnel-metrics/seatunnel-metrics-spark/src/main/java/org/apache/spark/seatunnel/metrics/sink/SeatunnelMetricSink.scala b/seatunnel-metrics/seatunnel-metrics-spark/src/main/java/org/apache/spark/seatunnel/metrics/sink/SeatunnelMetricSink.scala
new file mode 100644
index 000000000..86600b4c1
--- /dev/null
+++ b/seatunnel-metrics/seatunnel-metrics-spark/src/main/java/org/apache/spark/seatunnel/metrics/sink/SeatunnelMetricSink.scala
@@ -0,0 +1,47 @@
+package org.apache.spark.seatunnel.metrics.sink
+
+import java.util.Properties
+
+import com.codahale.metrics.MetricRegistry
+import org.apache.seatunnel.metrics.spark.SeatunnelMetricSink.SinkConfig
+import org.apache.spark.{SecurityManager, SparkConf, SparkEnv}
+import org.apache.spark.internal.config
+import org.apache.spark.metrics.sink.Sink
+import org.apache.spark.seatunnel.metrics.sink.SeatunnelMetricSink.SinkConfigProxy
+
+object SeatunnelMetricSink {
+
+  class SinkConfigProxy extends SinkConfig {
+    // SparkEnv may become available only after metrics sink creation thus retrieving
+    // SparkConf from spark env here and not during the creation/initialisation of PrometheusSink.
+    @transient
+    private lazy val sparkConfig = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf(true))
+
+    // Don't use sparkConf.getOption("spark.metrics.namespace") as the underlying string won't be substituted.
+    def metricsNamespace: Option[String] = sparkConfig.get(config.METRICS_NAMESPACE)
+
+    def sparkAppId: Option[String] = sparkConfig.getOption("spark.app.id")
+
+    def sparkAppName: Option[String] = sparkConfig.getOption("spark.app.name")
+
+    def executorId: Option[String] = sparkConfig.getOption("spark.executor.id")
+  }
+}
+
+class SeatunnelMetricSink(property: Properties, registry: MetricRegistry, sinkConfig: SinkConfig)
+  extends org.apache.seatunnel.metrics.spark.SeatunnelMetricSink(property, registry, sinkConfig)
+  with Sink {
+
+  // Constructor required by MetricsSystem::registerSinks() for spark >= 3.2
+  def this(property: Properties, registry: MetricRegistry) = {
+    this(
+      property,
+      registry,
+      new SinkConfigProxy)
+  }
+
+  // Legacy Constructor required by MetricsSystem::registerSinks() for spark < 3.2
+  def this(property: Properties, registry: MetricRegistry, securityMgr: SecurityManager) = {
+    this(property, registry)
+  }
+}
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
index 3994fc776..2a7dc726a 100644
--- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
@@ -86,7 +86,10 @@ public class CoordinatedSource<T, SplitT extends SourceSplit, StateT extends Ser
 
     private void createSplitEnumerator() throws Exception {
         if (restoredState != null && restoredState.size() > 0) {
-            StateT restoredEnumeratorState = enumeratorStateSerializer.deserialize(restoredState.get(-1).get(0));
+            StateT restoredEnumeratorState = null;
+            if (restoredState.containsKey(-1)) {
+                restoredEnumeratorState = enumeratorStateSerializer.deserialize(restoredState.get(-1).get(0));
+            }
             splitEnumerator = source.restoreEnumerator(coordinatedEnumeratorContext, restoredEnumeratorState);
             restoredState.forEach((subtaskId, splitBytes) -> {
                 if (subtaskId == -1) {
@@ -182,8 +185,6 @@ public class CoordinatedSource<T, SplitT extends SourceSplit, StateT extends Ser
 
     @Override
     public Map<Integer, List<byte[]>> snapshotState(long checkpointId) throws Exception {
-        StateT enumeratorState = splitEnumerator.snapshotState(checkpointId);
-        byte[] enumeratorStateBytes = enumeratorStateSerializer.serialize(enumeratorState);
         Map<Integer, List<byte[]>> allStates = readerMap.entrySet()
             .parallelStream()
             .collect(Collectors.toMap(
@@ -200,7 +201,11 @@ public class CoordinatedSource<T, SplitT extends SourceSplit, StateT extends Ser
                         throw new RuntimeException(e);
                     }
                 }));
-        allStates.put(-1, Collections.singletonList(enumeratorStateBytes));
+        StateT enumeratorState = splitEnumerator.snapshotState(checkpointId);
+        if (enumeratorState != null) {
+            byte[] enumeratorStateBytes = enumeratorStateSerializer.serialize(enumeratorState);
+            allStates.put(-1, Collections.singletonList(enumeratorStateBytes));
+        }
         return allStates;
     }
 
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index 95aa7f18e..147a5350c 100644
--- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -79,7 +79,10 @@ public class ParallelSource<T, SplitT extends SourceSplit, StateT extends Serial
         // Create or restore split enumerator & reader
         try {
             if (restoredState != null && restoredState.size() > 0) {
-                StateT restoredEnumeratorState = enumeratorStateSerializer.deserialize(restoredState.get(-1).get(0));
+                StateT restoredEnumeratorState = null;
+                if (restoredState.containsKey(-1)) {
+                    restoredEnumeratorState = enumeratorStateSerializer.deserialize(restoredState.get(-1).get(0));
+                }
                 restoredSplitState = new ArrayList<>(restoredState.get(subtaskId).size());
                 for (byte[] splitBytes : restoredState.get(subtaskId)) {
                     restoredSplitState.add(splitSerializer.deserialize(splitBytes));
@@ -180,12 +183,14 @@ public class ParallelSource<T, SplitT extends SourceSplit, StateT extends Serial
 
     @Override
     public Map<Integer, List<byte[]>> snapshotState(long checkpointId) throws Exception {
-        byte[] enumeratorStateBytes = enumeratorStateSerializer.serialize(splitEnumerator.snapshotState(checkpointId));
-        List<SplitT> splitStates = reader.snapshotState(checkpointId);
         Map<Integer, List<byte[]>> allStates = new HashMap<>(2);
-        if (enumeratorStateBytes != null) {
+
+        StateT enumeratorState = splitEnumerator.snapshotState(checkpointId);
+        if (enumeratorState != null) {
+            byte[] enumeratorStateBytes = enumeratorStateSerializer.serialize(enumeratorState);
             allStates.put(-1, Collections.singletonList(enumeratorStateBytes));
         }
+        List<SplitT> splitStates = reader.snapshotState(checkpointId);
         if (splitStates != null) {
             final List<byte[]> readerStateBytes = new ArrayList<>(splitStates.size());
             for (SplitT splitState : splitStates) {