You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/10/19 02:04:06 UTC

[incubator-seatunnel] branch dev updated: [Feature][connectors-v2][kafka] Kafka supports custom schema #2371 (#2783)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6506e306e [Feature][connectors-v2][kafka] Kafka supports custom schema #2371 (#2783)
6506e306e is described below

commit 6506e306ebca5b1cf956dc0f53e4683c907db76f
Author: wangwenzhao <45...@users.noreply.github.com>
AuthorDate: Wed Oct 19 10:04:01 2022 +0800

    [Feature][connectors-v2][kafka] Kafka supports custom schema #2371 (#2783)
---
 docs/en/connector-v2/source/kafka.md               |  29 +++-
 seatunnel-connectors-v2/connector-kafka/pom.xml    |  12 +-
 .../connectors/seatunnel/kafka/config/Config.java  |  26 +++
 .../seatunnel/kafka/source/KafkaSource.java        |  51 +++++-
 .../seatunnel/kafka/source/KafkaSourceReader.java  |  14 +-
 .../connector-kafka-flink-e2e}/pom.xml             |  38 +++--
 .../e2e/flink/v2/kafka/KafkaContainer.java         | 151 +++++++++++++++++
 .../flink/v2/kafka/KafkaSourceJsonToConsoleIT.java | 181 ++++++++++++++++++++
 .../flink/v2/kafka/KafkaSourceTextToConsoleIT.java | 184 +++++++++++++++++++++
 .../kafka/kafkasource_json_to_console.conf         |  91 ++++++++++
 .../kafka/kafkasource_text_to_console.conf         |  92 +++++++++++
 .../src/test/resources/log4j.properties            |  22 +++
 .../seatunnel-flink-connector-v2-e2e/pom.xml       |   1 +
 .../connector-kafka-spark-e2e}/pom.xml             |  32 ++--
 .../e2e/spark/v2/kafka/KafkaContainer.java         | 151 +++++++++++++++++
 .../spark/v2/kafka/KafkaSourceJsonToConsoleIT.java | 180 ++++++++++++++++++++
 .../spark/v2/kafka/KafkaSourceTextToConsoleIT.java | 184 +++++++++++++++++++++
 .../kafka/kafkasource_json_to_console.conf         |  90 ++++++++++
 .../kafka/kafkasource_text_to_console.conf         |  93 +++++++++++
 .../src/test/resources/log4j.properties            |  22 +++
 .../seatunnel-spark-connector-v2-e2e/pom.xml       |   1 +
 21 files changed, 1594 insertions(+), 51 deletions(-)

diff --git a/docs/en/connector-v2/source/kafka.md b/docs/en/connector-v2/source/kafka.md
index 40b1ae5ab..dc5360d89 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -26,6 +26,8 @@ Source connector for Apache Kafka.
 | commit_on_checkpoint | Boolean | no       | true                     |
 | kafka.*              | String  | no       | -                        |
 | common-options       |         | no       | -                        |
+| schema               |         | no       | -                        |
+| format               | String  | no       | json                     |
 
 ### topic [string]
 
@@ -57,6 +59,13 @@ The way to specify parameters is to add the prefix `kafka.` to the original para
 
 Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details.
 
+### schema
+The structure of the data, including field names and field types.
+
+## format
+Data format. The default format is json. Optional text format. The default field separator is ", ".
+If you customize the delimiter, add the "field_delimiter" option.
+
 ## Example
 
 ###  Simple
@@ -64,12 +73,22 @@ Source plugin common parameters, please refer to [Source Common Options](common-
 ```hocon
 source {
 
-    Kafka {
-          topic = "seatunnel"
-          bootstrap.servers = "localhost:9092"
-          consumer.group = "seatunnel_group"
+  Kafka {
+    result_table_name = "kafka_name"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
     }
-
+    format = text
+    field_delimiter = "#“
+    topic = "topic_1,topic_2,topic_3"
+    bootstrap.server = "localhost:9092"
+    kafka.max.poll.records = 500
+    kafka.client.id = client_1
+  }
+  
 }
 ```
 
diff --git a/seatunnel-connectors-v2/connector-kafka/pom.xml b/seatunnel-connectors-v2/connector-kafka/pom.xml
index 4159e8783..3f8fe0ea4 100644
--- a/seatunnel-connectors-v2/connector-kafka/pom.xml
+++ b/seatunnel-connectors-v2/connector-kafka/pom.xml
@@ -28,7 +28,7 @@
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>connector-kafka</artifactId>
-    
+
     <properties>
         <kafka.client.version>3.2.0</kafka.client.version>
     </properties>
@@ -36,6 +36,11 @@
     <dependencies>
 
         <!-- TODO add to dependency management after version unify-->
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
@@ -46,6 +51,11 @@
             <artifactId>seatunnel-format-json</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-text</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index d577b2bad..2dedcd13d 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -50,6 +50,31 @@ public class Config {
      */
     public static final String TRANSACTION_PREFIX = "transaction_prefix";
 
+    /**
+     * User-defined schema
+     */
+    public static final String SCHEMA = "schema";
+
+    /**
+     * data format
+     */
+    public static final String FORMAT = "format";
+
+    /**
+     * The default data format is JSON
+     */
+    public static final String DEFAULT_FORMAT = "json";
+
+    /**
+     * field delimiter
+     */
+    public static final String FIELD_DELIMITER = "field_delimiter";
+
+    /**
+     * The default field delimiter is “,”
+     */
+    public static final String DEFAULT_FIELD_DELIMITER = ",";
+
     /**
      * Send information according to the specified partition.
      */
@@ -64,4 +89,5 @@ public class Config {
      * Determine the key of the kafka send partition
      */
     public static final String PARTITION_KEY = "partition_key";
+
 }
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index a6b6199f5..3c608f4a5 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -20,17 +20,21 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.source;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SCHEMA;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
 
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -38,7 +42,10 @@ import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.text.TextDeserializationSchema;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -52,6 +59,7 @@ public class KafkaSource implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
     private static final String DEFAULT_CONSUMER_GROUP = "SeaTunnel-Consumer-Group";
 
     private final ConsumerMetadata metadata = new ConsumerMetadata();
+    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
     private SeaTunnelRowType typeInfo;
     private JobContext jobContext;
 
@@ -92,10 +100,7 @@ public class KafkaSource implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
             this.metadata.getProperties().put(e.getKey(), String.valueOf(e.getValue().unwrapped()));
         });
 
-        // TODO support user custom row type
-        this.typeInfo = new SeaTunnelRowType(new String[]{"topic", "raw_message"},
-                new SeaTunnelDataType[]{BasicType.STRING_TYPE, BasicType.STRING_TYPE});
-
+        setDeserialization(config);
     }
 
     @Override
@@ -105,7 +110,7 @@ public class KafkaSource implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
 
     @Override
     public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
-        return new KafkaSourceReader(this.metadata, this.typeInfo, readerContext);
+        return new KafkaSourceReader(this.metadata, deserializationSchema, readerContext);
     }
 
     @Override
@@ -122,4 +127,36 @@ public class KafkaSource implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
     public void setJobContext(JobContext jobContext) {
         this.jobContext = jobContext;
     }
+
+    private void setDeserialization(Config config) {
+        if (config.hasPath(SCHEMA)) {
+            Config schema = config.getConfig(SCHEMA);
+            typeInfo = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
+            String format = DEFAULT_FORMAT;
+            if (config.hasPath(FORMAT)) {
+                format = config.getString(FORMAT);
+            }
+            if (DEFAULT_FORMAT.equals(format)) {
+                deserializationSchema = new JsonDeserializationSchema(false, false, typeInfo);
+            } else if ("text".equals(format)) {
+                String delimiter = DEFAULT_FIELD_DELIMITER;
+                if (config.hasPath(FIELD_DELIMITER)) {
+                    delimiter = config.getString(FIELD_DELIMITER);
+                }
+                deserializationSchema = TextDeserializationSchema.builder()
+                        .seaTunnelRowType(typeInfo)
+                        .delimiter(delimiter)
+                        .build();
+            } else {
+                // TODO: use format SPI
+                throw new UnsupportedOperationException("Unsupported format: " + format);
+            }
+        } else {
+            typeInfo = SeaTunnelSchema.buildSimpleTextSchema();
+            this.deserializationSchema = TextDeserializationSchema.builder()
+                    .seaTunnelRowType(typeInfo)
+                    .delimiter(String.valueOf('\002'))
+                    .build();
+        }
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index bb5486956..24252ce68 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -17,11 +17,11 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.source;
 
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -59,15 +59,15 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
     private final ConcurrentMap<TopicPartition, KafkaSourceSplit> sourceSplitMap;
     private final Map<TopicPartition, KafkaConsumerThread> consumerThreadMap;
     private final ExecutorService executorService;
-    // TODO support user custom type
-    private SeaTunnelRowType typeInfo;
+    private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
 
-    KafkaSourceReader(ConsumerMetadata metadata, SeaTunnelRowType typeInfo,
+    KafkaSourceReader(ConsumerMetadata metadata,
+                      DeserializationSchema<SeaTunnelRow> deserializationSchema,
                       SourceReader.Context context) {
         this.metadata = metadata;
         this.context = context;
-        this.typeInfo = typeInfo;
         this.sourceSplits = new HashSet<>();
+        this.deserializationSchema = deserializationSchema;
         this.consumerThreadMap = new ConcurrentHashMap<>();
         this.sourceSplitMap = new ConcurrentHashMap<>();
         this.checkpointOffsetMap = new ConcurrentHashMap<>();
@@ -114,9 +114,7 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
                             List<ConsumerRecord<byte[], byte[]>> recordList = records.records(partition);
                             for (ConsumerRecord<byte[], byte[]> record : recordList) {
 
-                                String v = stringDeserializer.deserialize(partition.topic(), record.value());
-                                String t = partition.topic();
-                                output.collect(new SeaTunnelRow(new Object[]{t, v}));
+                                deserializationSchema.deserialize(record.value(), output);
 
                                 if (Boundedness.BOUNDED.equals(context.getBoundedness()) &&
                                     record.offset() >= sourceSplit.getEndOffset()) {
diff --git a/seatunnel-connectors-v2/connector-kafka/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml
similarity index 59%
copy from seatunnel-connectors-v2/connector-kafka/pom.xml
copy to seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml
index 4159e8783..2eccfa9c5 100644
--- a/seatunnel-connectors-v2/connector-kafka/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml
@@ -1,50 +1,58 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
-
        http://www.apache.org/licenses/LICENSE-2.0
-
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
-
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>seatunnel-connectors-v2</artifactId>
         <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-flink-connector-v2-e2e</artifactId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>connector-kafka</artifactId>
-    
-    <properties>
-        <kafka.client.version>3.2.0</kafka.client.version>
-    </properties>
+    <artifactId>connector-kafka-flink-e2e</artifactId>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-flink-e2e-base</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
 
-        <!-- TODO add to dependency management after version unify-->
+        <!-- SeaTunnel connectors -->
         <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-            <version>${kafka.client.version}</version>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-kafka</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-format-json</artifactId>
+            <artifactId>connector-assert</artifactId>
             <version>${project.version}</version>
+            <scope>test</scope>
         </dependency>
     </dependencies>
 
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaContainer.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaContainer.java
new file mode 100644
index 000000000..7d7fe1920
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaContainer.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.kafka;
+
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import lombok.SneakyThrows;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+
+/**
+ * This container wraps Confluent Kafka and Zookeeper (optionally)
+ */
+public class KafkaContainer extends GenericContainer<KafkaContainer> {
+
+    private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka");
+
+    public static final int KAFKA_PORT = 9093;
+
+    public static final int ZOOKEEPER_PORT = 2181;
+
+    private static final String DEFAULT_INTERNAL_TOPIC_RF = "1";
+
+    protected String externalZookeeperConnect = null;
+
+    public KafkaContainer(final DockerImageName dockerImageName) {
+        super(dockerImageName);
+        dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
+
+        withExposedPorts(KAFKA_PORT);
+
+        // Use two listeners with different names, it will force Kafka to communicate with itself via internal
+        // listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
+        withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092");
+        withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
+        withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
+
+        withEnv("KAFKA_BROKER_ID", "1");
+        withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
+        withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF);
+        withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
+        withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF);
+        withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
+        withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
+    }
+
+    public KafkaContainer withEmbeddedZookeeper() {
+        externalZookeeperConnect = null;
+        return self();
+    }
+
+    public KafkaContainer withExternalZookeeper(String connectString) {
+        externalZookeeperConnect = connectString;
+        return self();
+    }
+
+    public String getBootstrapServers() {
+        return String.format("PLAINTEXT://%s:%s", getLinuxLocalIp(), getMappedPort(KAFKA_PORT));
+    }
+
+    @Override
+    protected void configure() {
+        withEnv(
+                "KAFKA_ADVERTISED_LISTENERS",
+                String.format("BROKER://%s:9092", getNetwork() != null ? getNetworkAliases().get(1) : "localhost")
+        );
+
+        String command = "#!/bin/bash\n";
+        if (externalZookeeperConnect != null) {
+            withEnv("KAFKA_ZOOKEEPER_CONNECT", externalZookeeperConnect);
+        } else {
+            addExposedPort(ZOOKEEPER_PORT);
+            withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZOOKEEPER_PORT);
+            command += "echo 'clientPort=" + ZOOKEEPER_PORT + "' > zookeeper.properties\n";
+            command += "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties\n";
+            command += "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties\n";
+            command += "zookeeper-server-start zookeeper.properties &\n";
+        }
+
+        // Optimization: skip the checks
+        command += "echo '' > /etc/confluent/docker/ensure \n";
+        // Run the original command
+        command += "/etc/confluent/docker/run \n";
+        withCommand("sh", "-c", command);
+    }
+
+    @Override
+    @SneakyThrows
+    protected void containerIsStarted(InspectContainerResponse containerInfo) {
+        String brokerAdvertisedListener = brokerAdvertisedListener(containerInfo);
+        ExecResult result = execInContainer(
+                "kafka-configs",
+                "--alter",
+                "--bootstrap-server",
+                brokerAdvertisedListener,
+                "--entity-type",
+                "brokers",
+                "--entity-name",
+                getEnvMap().get("KAFKA_BROKER_ID"),
+                "--add-config",
+                "advertised.listeners=[" + String.join(",", getBootstrapServers(), brokerAdvertisedListener) + "]"
+        );
+        if (result.getExitCode() != 0) {
+            throw new IllegalStateException(result.toString());
+        }
+    }
+
+    protected String brokerAdvertisedListener(InspectContainerResponse containerInfo) {
+        return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092");
+    }
+
+    public String getLinuxLocalIp() {
+        String ip = "";
+        try {
+            Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
+            while (networkInterfaces.hasMoreElements()) {
+                NetworkInterface networkInterface = networkInterfaces.nextElement();
+                Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
+                while (inetAddresses.hasMoreElements()) {
+                    InetAddress inetAddress = inetAddresses.nextElement();
+                    if (!inetAddress.isLoopbackAddress() && inetAddress instanceof Inet4Address) {
+                        ip =  inetAddress.getHostAddress();
+                    }
+                }
+            }
+        } catch (SocketException ex) {
+            ex.printStackTrace();
+        }
+        return ip;
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceJsonToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceJsonToConsoleIT.java
new file mode 100644
index 000000000..e3468e8dd
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceJsonToConsoleIT.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.kafka;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+/**
+ * This test case is used to verify that the kafka source is able to send data to the console.
+ * Make sure the SeaTunnel job can submit successfully on spark engine.
+ */
+@Slf4j
+public class KafkaSourceJsonToConsoleIT extends FlinkContainer {
+
+    private static final int KAFKA_PORT = 9093;
+
+    private static final String KAFKA_HOST = "kafkaCluster";
+
+    private KafkaProducer<byte[], byte[]> producer;
+
+    private KafkaContainer kafkaContainer;
+
+    @BeforeEach
+    public void startKafkaContainer() {
+        kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
+                .withNetwork(NETWORK)
+                .withNetworkAliases(KAFKA_HOST)
+                .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1")));
+        kafkaContainer.setPortBindings(Lists.newArrayList(
+                String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+        Startables.deepStart(Stream.of(kafkaContainer)).join();
+        log.info("Kafka container started");
+        Awaitility.given().ignoreExceptions()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(() -> initKafkaProducer());
+        generateTestData();
+    }
+
+    @SuppressWarnings("checkstyle:Indentation")
+    private void generateTestData() {
+
+        SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
+                new String[]{
+                        "id",
+                        "c_map",
+                        "c_array",
+                        "c_string",
+                        "c_boolean",
+                        "c_tinyint",
+                        "c_smallint",
+                        "c_int",
+                        "c_bigint",
+                        "c_float",
+                        "c_double",
+                        "c_decimal",
+                        "c_bytes",
+                        "c_date",
+                        "c_timestamp"
+                },
+                new SeaTunnelDataType[]{
+                        BasicType.LONG_TYPE,
+                        new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+                        ArrayType.BYTE_ARRAY_TYPE,
+                        BasicType.STRING_TYPE,
+                        BasicType.BOOLEAN_TYPE,
+                        BasicType.BYTE_TYPE,
+                        BasicType.SHORT_TYPE,
+                        BasicType.INT_TYPE,
+                        BasicType.LONG_TYPE,
+                        BasicType.FLOAT_TYPE,
+                        BasicType.DOUBLE_TYPE,
+                        new DecimalType(2, 1),
+                        PrimitiveByteArrayType.INSTANCE,
+                        LocalTimeType.LOCAL_DATE_TYPE,
+                        LocalTimeType.LOCAL_DATE_TIME_TYPE
+                }
+        );
+
+        DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType);
+
+        for (int i = 0; i < 100; i++) {
+            SeaTunnelRow row = new SeaTunnelRow(
+                    new Object[]{
+                            Long.valueOf(i),
+                            Collections.singletonMap("key", Short.parseShort("1")),
+                            new Byte[]{Byte.parseByte("1")},
+                            "string",
+                            Boolean.FALSE,
+                            Byte.parseByte("1"),
+                            Short.parseShort("1"),
+                            Integer.parseInt("1"),
+                            Long.parseLong("1"),
+                            Float.parseFloat("1.1"),
+                            Double.parseDouble("1.1"),
+                            BigDecimal.valueOf(11, 1),
+                            "test".getBytes(),
+                            LocalDate.now(),
+                            LocalDateTime.now()
+                    });
+            ProducerRecord<byte[], byte[]> producerRecord = serializer.serializeRow(row);
+            producer.send(producerRecord);
+        }
+    }
+
+    @Test
+    public void testKafkaSourceJsonToConsole() throws IOException, InterruptedException {
+        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/kafka/kafkasource_json_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+    }
+
+    private void initKafkaProducer() {
+        Properties props = new Properties();
+        String bootstrapServers = kafkaContainer.getBootstrapServers();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        producer = new KafkaProducer<>(props);
+    }
+
+    @AfterEach
+    public void close() {
+        if (producer != null) {
+            producer.close();
+        }
+        if (kafkaContainer != null) {
+            kafkaContainer.close();
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceTextToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceTextToConsoleIT.java
new file mode 100644
index 000000000..f0855b455
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceTextToConsoleIT.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.kafka;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+import org.apache.seatunnel.format.text.TextSerializationSchema;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+/**
+ * This test case is used to verify that the kafka source is able to send data to the console.
+ * Make sure the SeaTunnel job can submit successfully on spark engine.
+ */
+@Slf4j
+public class KafkaSourceTextToConsoleIT extends FlinkContainer {
+
+    private static final int KAFKA_PORT = 9093;
+
+    private static final String KAFKA_HOST = "kafkaCluster";
+
+    private KafkaProducer<byte[], byte[]> producer;
+
+    private KafkaContainer kafkaContainer;
+
+    @BeforeEach
+    public void startKafkaContainer() {
+        kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
+                .withNetwork(NETWORK)
+                .withNetworkAliases(KAFKA_HOST)
+                .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1")));
+        kafkaContainer.setPortBindings(Lists.newArrayList(
+                String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+        Startables.deepStart(Stream.of(kafkaContainer)).join();
+        log.info("Kafka container started");
+        Awaitility.given().ignoreExceptions()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(() -> initKafkaProducer());
+        generateTestData();
+    }
+
+    @SuppressWarnings("checkstyle:Indentation")
+    private void generateTestData() {
+
+        SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
+                new String[]{
+                        "id",
+                        "c_map",
+                        "c_array",
+                        "c_string",
+                        "c_boolean",
+                        "c_tinyint",
+                        "c_smallint",
+                        "c_int",
+                        "c_bigint",
+                        "c_float",
+                        "c_double",
+                        "c_decimal",
+                        "c_bytes",
+                        "c_date",
+                        "c_timestamp"
+                },
+                new SeaTunnelDataType[]{
+                        BasicType.LONG_TYPE,
+                        new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+                        ArrayType.BYTE_ARRAY_TYPE,
+                        BasicType.STRING_TYPE,
+                        BasicType.BOOLEAN_TYPE,
+                        BasicType.BYTE_TYPE,
+                        BasicType.SHORT_TYPE,
+                        BasicType.INT_TYPE,
+                        BasicType.LONG_TYPE,
+                        BasicType.FLOAT_TYPE,
+                        BasicType.DOUBLE_TYPE,
+                        new DecimalType(2, 1),
+                        PrimitiveByteArrayType.INSTANCE,
+                        LocalTimeType.LOCAL_DATE_TYPE,
+                        LocalTimeType.LOCAL_DATE_TIME_TYPE
+                });
+
+        TextSerializationSchema serializationSchema = TextSerializationSchema.builder()
+                .seaTunnelRowType(seatunnelRowType)
+                .delimiter(",")
+                .build();
+
+        for (int i = 0; i < 100; i++) {
+            SeaTunnelRow row = new SeaTunnelRow(
+                    new Object[]{
+                            Long.valueOf(i),
+                            Collections.singletonMap("key", Short.parseShort("1")),
+                            new Byte[]{Byte.parseByte("1")},
+                            "string",
+                            Boolean.FALSE,
+                            Byte.parseByte("1"),
+                            Short.parseShort("1"),
+                            Integer.parseInt("1"),
+                            Long.parseLong("1"),
+                            Float.parseFloat("1.1"),
+                            Double.parseDouble("1.1"),
+                            BigDecimal.valueOf(11, 1),
+                            "test".getBytes(),
+                            LocalDate.now(),
+                            LocalDateTime.now()
+                    });
+
+            ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>("test_topic", null, serializationSchema.serialize(row));
+            producer.send(producerRecord);
+        }
+    }
+
+    @Test
+    public void testKafkaSourceTextToConsole() throws IOException, InterruptedException {
+        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/kafka/kafkasource_text_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+    }
+
+    private void initKafkaProducer() {
+        Properties props = new Properties();
+        String bootstrapServers = kafkaContainer.getBootstrapServers();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        producer = new KafkaProducer<>(props);
+    }
+
+    @AfterEach
+    public void close() {
+        if (producer != null) {
+            producer.close();
+        }
+        if (kafkaContainer != null) {
+            kafkaContainer.close();
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
new file mode 100644
index 000000000..62a1dc967
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
@@ -0,0 +1,91 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9093"
+    topic = "test_topic"
+    result_table_name = "kafka_table"
+    kafka.auto.offset.reset = "earliest"
+    schema = {
+      fields {
+           id = bigint
+           c_map = "map<string, smallint>"
+           c_array = "array<tinyint>"
+           c_string = string
+           c_boolean = boolean
+           c_tinyint = tinyint
+           c_smallint = smallint
+           c_int = int
+           c_bigint = bigint
+           c_float = float
+           c_double = double
+           c_decimal = "decimal(2, 1)"
+           c_bytes = bytes
+           c_date = date
+           c_timestamp = timestamp
+      }
+    }
+    # The default format is json, which is optional
+    format = json
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
+}
+
+transform {
+}
+
+sink {
+  Console {}
+  Assert {
+     rules =
+       {
+         field_rules = [
+ 		 {
+             field_name = id
+             field_type = long
+             field_value = [
+                 {
+                     rule_type = NOT_NULL
+                 },
+                 {
+                     rule_type = MIN
+                     rule_value = 0
+                 },
+                 {
+                     rule_type = MAX
+                     rule_value = 99
+                 }
+             ]
+          }
+          ]
+       }
+
+   }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
new file mode 100644
index 000000000..c1b3c0d47
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9093"
+    topic = "test_topic"
+    result_table_name = "kafka_table"
+    kafka.auto.offset.reset = "earliest"
+    schema = {
+      fields {
+           id = bigint
+           c_map = "map<string, smallint>"
+           c_array = "array<tinyint>"
+           c_string = string
+           c_boolean = boolean
+           c_tinyint = tinyint
+           c_smallint = smallint
+           c_int = int
+           c_bigint = bigint
+           c_float = float
+           c_double = double
+           c_decimal = "decimal(2, 1)"
+           c_bytes = bytes
+           c_date = date
+           c_timestamp = timestamp
+      }
+    }
+    format = text
+    # The default field delimiter is ","
+    field_delimiter = ","
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
+}
+
+transform {
+}
+
+sink {
+  Console {}
+  Assert {
+     rules =
+       {
+         field_rules = [
+ 		{
+            field_name = id
+            field_type = long
+            field_value = [
+                {
+                    rule_type = NOT_NULL
+                },
+                {
+                    rule_type = MIN
+                    rule_value = 0
+                },
+                {
+                    rule_type = MAX
+                    rule_value = 99
+                }
+            ]
+         }
+         ]
+       }
+
+   }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/log4j.properties
new file mode 100644
index 000000000..db5d9e512
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index 95d24c889..aea126d7f 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -37,6 +37,7 @@
         <module>connector-mongodb-flink-e2e</module>
         <module>connector-iceberg-flink-e2e</module>
         <module>connector-influxdb-flink-e2e</module>
+        <module>connector-kafka-flink-e2e</module>
     </modules>
 
     <dependencies>
diff --git a/seatunnel-connectors-v2/connector-kafka/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml
similarity index 65%
copy from seatunnel-connectors-v2/connector-kafka/pom.xml
copy to seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml
index 4159e8783..5f92a2599 100644
--- a/seatunnel-connectors-v2/connector-kafka/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml
@@ -1,50 +1,52 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
-
        http://www.apache.org/licenses/LICENSE-2.0
-
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
-
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>seatunnel-connectors-v2</artifactId>
         <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-spark-connector-v2-e2e</artifactId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>connector-kafka</artifactId>
-    
-    <properties>
-        <kafka.client.version>3.2.0</kafka.client.version>
-    </properties>
+    <artifactId>connector-kafka-spark-e2e</artifactId>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-spark-e2e-base</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
 
-        <!-- TODO add to dependency management after version unify-->
+        <!-- SeaTunnel connectors -->
         <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-            <version>${kafka.client.version}</version>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-kafka</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-format-json</artifactId>
+            <artifactId>connector-console</artifactId>
             <version>${project.version}</version>
+            <scope>test</scope>
         </dependency>
     </dependencies>
 
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java
new file mode 100644
index 000000000..431df9205
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.kafka;
+
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import lombok.SneakyThrows;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+
+/**
+ * This container wraps Confluent Kafka and Zookeeper (optionally)
+ */
+public class KafkaContainer extends GenericContainer<KafkaContainer> {
+
+    private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka");
+
+    public static final int KAFKA_PORT = 9093;
+
+    public static final int ZOOKEEPER_PORT = 2181;
+
+    private static final String DEFAULT_INTERNAL_TOPIC_RF = "1";
+
+    protected String externalZookeeperConnect = null;
+
+    public KafkaContainer(final DockerImageName dockerImageName) {
+        super(dockerImageName);
+        dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
+
+        withExposedPorts(KAFKA_PORT);
+
+        // Use two listeners with different names, it will force Kafka to communicate with itself via internal
+        // listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use the advertised listener
+        withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092");
+        withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
+        withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
+
+        withEnv("KAFKA_BROKER_ID", "1");
+        withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
+        withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF);
+        withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
+        withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF);
+        withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
+        withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
+    }
+
+    public KafkaContainer withEmbeddedZookeeper() {
+        externalZookeeperConnect = null;
+        return self();
+    }
+
+    public KafkaContainer withExternalZookeeper(String connectString) {
+        externalZookeeperConnect = connectString;
+        return self();
+    }
+
+    public String getBootstrapServers() {
+        return String.format("PLAINTEXT://%s:%s", getLinuxLocalIp(), getMappedPort(KAFKA_PORT));
+    }
+
+    @Override
+    protected void configure() {
+        withEnv(
+                "KAFKA_ADVERTISED_LISTENERS",
+                String.format("BROKER://%s:9092", getNetwork() != null ? getNetworkAliases().get(1) : "localhost")
+        );
+
+        String command = "#!/bin/bash\n";
+        if (externalZookeeperConnect != null) {
+            withEnv("KAFKA_ZOOKEEPER_CONNECT", externalZookeeperConnect);
+        } else {
+            addExposedPort(ZOOKEEPER_PORT);
+            withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZOOKEEPER_PORT);
+            command += "echo 'clientPort=" + ZOOKEEPER_PORT + "' > zookeeper.properties\n";
+            command += "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties\n";
+            command += "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties\n";
+            command += "zookeeper-server-start zookeeper.properties &\n";
+        }
+
+        // Optimization: skip the checks
+        command += "echo '' > /etc/confluent/docker/ensure \n";
+        // Run the original command
+        command += "/etc/confluent/docker/run \n";
+        withCommand("sh", "-c", command);
+    }
+
+    @Override
+    @SneakyThrows
+    protected void containerIsStarted(InspectContainerResponse containerInfo) {
+        String brokerAdvertisedListener = brokerAdvertisedListener(containerInfo);
+        ExecResult result = execInContainer(
+                "kafka-configs",
+                "--alter",
+                "--bootstrap-server",
+                brokerAdvertisedListener,
+                "--entity-type",
+                "brokers",
+                "--entity-name",
+                getEnvMap().get("KAFKA_BROKER_ID"),
+                "--add-config",
+                "advertised.listeners=[" + String.join(",", getBootstrapServers(), brokerAdvertisedListener) + "]"
+        );
+        if (result.getExitCode() != 0) {
+            throw new IllegalStateException(result.toString());
+        }
+    }
+
+    protected String brokerAdvertisedListener(InspectContainerResponse containerInfo) {
+        return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092");
+    }
+
+    public String getLinuxLocalIp() {
+        String ip = "";
+        try {
+            Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
+            while (networkInterfaces.hasMoreElements()) {
+                NetworkInterface networkInterface = networkInterfaces.nextElement();
+                Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
+                while (inetAddresses.hasMoreElements()) {
+                    InetAddress inetAddress = inetAddresses.nextElement();
+                    if (!inetAddress.isLoopbackAddress() && inetAddress instanceof Inet4Address) {
+                        ip =  inetAddress.getHostAddress();
+                    }
+                }
+            }
+        } catch (SocketException ex) {
+            ex.printStackTrace();
+        }
+        return ip;
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java
new file mode 100644
index 000000000..8b9a425a9
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.kafka;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+/**
+ * This test case is used to verify that the kafka source is able to send data to the console.
+ * Make sure the SeaTunnel job can submit successfully on spark engine.
+ */
+@SuppressWarnings("checkstyle:EmptyLineSeparator")
+@Slf4j
+public class KafkaSourceJsonToConsoleIT extends SparkContainer {
+
+    private static final int KAFKA_PORT = 9093;
+
+    private static final String KAFKA_HOST = "kafkaCluster";
+
+    private KafkaProducer<byte[], byte[]> producer;
+
+    private KafkaContainer kafkaContainer;
+
+    @BeforeEach
+    public void startKafkaContainer() {
+        kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
+                .withNetwork(NETWORK)
+                .withNetworkAliases(KAFKA_HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));
+        kafkaContainer.setPortBindings(Lists.newArrayList(
+                String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+        Startables.deepStart(Stream.of(kafkaContainer)).join();
+        log.info("Kafka container started");
+        Awaitility.given().ignoreExceptions()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(() -> initKafkaProducer());
+        generateTestData();
+    }
+
+    @SuppressWarnings("checkstyle:Indentation")
+    private void generateTestData() {
+
+        SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
+                new String[]{
+                        "id",
+                        "c_map",
+                        "c_array",
+                        "c_string",
+                        "c_boolean",
+                        "c_tinyint",
+                        "c_smallint",
+                        "c_int",
+                        "c_bigint",
+                        "c_float",
+                        "c_double",
+                        "c_decimal",
+                        "c_bytes",
+                        "c_date",
+                        "c_timestamp"
+                },
+                new SeaTunnelDataType[]{
+                        BasicType.LONG_TYPE,
+                        new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+                        ArrayType.BYTE_ARRAY_TYPE,
+                        BasicType.STRING_TYPE,
+                        BasicType.BOOLEAN_TYPE,
+                        BasicType.BYTE_TYPE,
+                        BasicType.SHORT_TYPE,
+                        BasicType.INT_TYPE,
+                        BasicType.LONG_TYPE,
+                        BasicType.FLOAT_TYPE,
+                        BasicType.DOUBLE_TYPE,
+                        new DecimalType(2, 1),
+                        PrimitiveByteArrayType.INSTANCE,
+                        LocalTimeType.LOCAL_DATE_TYPE,
+                        LocalTimeType.LOCAL_DATE_TIME_TYPE
+                }
+        );
+
+        DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType);
+
+        for (int i = 0; i < 100; i++) {
+            SeaTunnelRow row = new SeaTunnelRow(
+                    new Object[]{
+                            Long.valueOf(i),
+                            Collections.singletonMap("key", Short.parseShort("1")),
+                            new Byte[]{Byte.parseByte("1")},
+                            "string",
+                            Boolean.FALSE,
+                            Byte.parseByte("1"),
+                            Short.parseShort("1"),
+                            Integer.parseInt("1"),
+                            Long.parseLong("1"),
+                            Float.parseFloat("1.1"),
+                            Double.parseDouble("1.1"),
+                            BigDecimal.valueOf(11, 1),
+                            "test".getBytes(),
+                            LocalDate.now(),
+                            LocalDateTime.now()
+                    });
+            ProducerRecord<byte[], byte[]> producerRecord = serializer.serializeRow(row);
+            producer.send(producerRecord);
+        }
+    }
+
+    @Test
+    public void testKafkaSource() throws IOException, InterruptedException {
+        Container.ExecResult execResult =  executeSeaTunnelSparkJob("/kafka/kafkasource_json_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+    }
+
+    private void initKafkaProducer() {
+        Properties props = new Properties();
+        String bootstrapServers = kafkaContainer.getBootstrapServers();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        producer = new KafkaProducer<>(props);
+    }
+
+    @AfterEach
+    public void close() {
+        if (producer != null) {
+            producer.close();
+        }
+        if (kafkaContainer != null) {
+            kafkaContainer.close();
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java
new file mode 100644
index 000000000..c2842557c
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.kafka;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+import org.apache.seatunnel.format.text.TextSerializationSchema;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+/**
+ * This test case is used to verify that the kafka source is able to send data to the console.
+ * Make sure the SeaTunnel job can submit successfully on spark engine.
+ */
+@Slf4j
+public class KafkaSourceTextToConsoleIT extends SparkContainer {
+
+    private static final int KAFKA_PORT = 9093;
+
+    private static final String KAFKA_HOST = "kafkaCluster";
+
+    private KafkaProducer<byte[], byte[]> producer;
+
+    private KafkaContainer kafkaContainer;
+
+    @BeforeEach
+    public void startKafkaContainer() {
+        kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
+                .withNetwork(NETWORK)
+                .withNetworkAliases(KAFKA_HOST)
+                .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1")));
+        kafkaContainer.setPortBindings(Lists.newArrayList(
+                String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+        Startables.deepStart(Stream.of(kafkaContainer)).join();
+        log.info("Kafka container started");
+        Awaitility.given().ignoreExceptions()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(() -> initKafkaProducer());
+        generateTestData();
+    }
+
+    @SuppressWarnings("checkstyle:Indentation")
+    private void generateTestData() {
+
+        SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
+                new String[]{
+                        "id",
+                        "c_map",
+                        "c_array",
+                        "c_string",
+                        "c_boolean",
+                        "c_tinyint",
+                        "c_smallint",
+                        "c_int",
+                        "c_bigint",
+                        "c_float",
+                        "c_double",
+                        "c_decimal",
+                        "c_bytes",
+                        "c_date",
+                        "c_timestamp"
+                },
+                new SeaTunnelDataType[]{
+                        BasicType.LONG_TYPE,
+                        new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+                        ArrayType.BYTE_ARRAY_TYPE,
+                        BasicType.STRING_TYPE,
+                        BasicType.BOOLEAN_TYPE,
+                        BasicType.BYTE_TYPE,
+                        BasicType.SHORT_TYPE,
+                        BasicType.INT_TYPE,
+                        BasicType.LONG_TYPE,
+                        BasicType.FLOAT_TYPE,
+                        BasicType.DOUBLE_TYPE,
+                        new DecimalType(2, 1),
+                        PrimitiveByteArrayType.INSTANCE,
+                        LocalTimeType.LOCAL_DATE_TYPE,
+                        LocalTimeType.LOCAL_DATE_TIME_TYPE
+                });
+
+        TextSerializationSchema serializationSchema = TextSerializationSchema.builder()
+                .seaTunnelRowType(seatunnelRowType)
+                .delimiter(",")
+                .build();
+
+        for (int i = 0; i < 100; i++) {
+            SeaTunnelRow row = new SeaTunnelRow(
+                    new Object[]{
+                            Long.valueOf(i),
+                            Collections.singletonMap("key", Short.parseShort("1")),
+                            new Byte[]{Byte.parseByte("1")},
+                            "string",
+                            Boolean.FALSE,
+                            Byte.parseByte("1"),
+                            Short.parseShort("1"),
+                            Integer.parseInt("1"),
+                            Long.parseLong("1"),
+                            Float.parseFloat("1.1"),
+                            Double.parseDouble("1.1"),
+                            BigDecimal.valueOf(11, 1),
+                            "test".getBytes(),
+                            LocalDate.now(),
+                            LocalDateTime.now()
+                    });
+
+            ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>("test_topic", null, serializationSchema.serialize(row));
+            producer.send(producerRecord);
+        }
+    }
+
+    @Test
+    public void testKafkaSourceTextToConsole() throws IOException, InterruptedException {
+        Container.ExecResult execResult = executeSeaTunnelSparkJob("/kafka/kafkasource_text_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+    }
+
+    private void initKafkaProducer() {
+        Properties props = new Properties();
+        String bootstrapServers = kafkaContainer.getBootstrapServers();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        producer = new KafkaProducer<>(props);
+    }
+
+    @AfterEach
+    public void close() {
+        if (producer != null) {
+            producer.close();
+        }
+        if (kafkaContainer != null) {
+            kafkaContainer.close();
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
new file mode 100644
index 000000000..91c920237
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+    # You can set spark configuration here
+    spark.app.name = "SeaTunnel"
+    source.parallelism = 1
+    job.mode = "BATCH"
+}
+
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9093"
+    topic = "test_topic"
+    result_table_name = "kafka_table"
+    kafka.auto.offset.reset = "earliest"
+    schema = {
+      fields {
+           id = bigint
+           c_map = "map<string, smallint>"
+           c_array = "array<tinyint>"
+           c_string = string
+           c_boolean = boolean
+           c_tinyint = tinyint
+           c_smallint = smallint
+           c_int = int
+           c_bigint = bigint
+           c_float = float
+           c_double = double
+           c_decimal = "decimal(2, 1)"
+           c_bytes = bytes
+           c_date = date
+           c_timestamp = timestamp
+      }
+    }
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
+}
+
+transform {
+}
+
+sink {
+  Console {}
+  Assert {
+     rules =
+       {
+         field_rules = [
+ 		 {
+             field_name = id
+             field_type = long
+             field_value = [
+                 {
+                     rule_type = NOT_NULL
+                 },
+                 {
+                     rule_type = MIN
+                     rule_value = 0
+                 },
+                 {
+                     rule_type = MAX
+                     rule_value = 99
+                 }
+             ]
+          }
+          ]
+       }
+
+   }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
new file mode 100644
index 000000000..369f34a2f
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
@@ -0,0 +1,93 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+    # You can set spark configuration here
+    spark.app.name = "SeaTunnel"
+    source.parallelism = 1
+    job.mode = "BATCH"
+}
+
+
+source {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9093"
+    topic = "test_topic"
+    result_table_name = "kafka_table"
+    kafka.auto.offset.reset = "earliest"
+    schema = {
+      fields {
+           id = bigint
+           c_map = "map<string, smallint>"
+           c_array = "array<tinyint>"
+           c_string = string
+           c_boolean = boolean
+           c_tinyint = tinyint
+           c_smallint = smallint
+           c_int = int
+           c_bigint = bigint
+           c_float = float
+           c_double = double
+           c_decimal = "decimal(2, 1)"
+           c_bytes = bytes
+           c_date = date
+           c_timestamp = timestamp
+      }
+    }
+    format = text
+    # The default field delimiter is ","
+    field_delimiter = ","
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
+}
+
+transform {
+}
+
+sink {
+  Console {}
+  Assert {
+     rules =
+       {
+         field_rules = [
+ 		 {
+             field_name = id
+             field_type = long
+             field_value = [
+                 {
+                     rule_type = NOT_NULL
+                 },
+                 {
+                     rule_type = MIN
+                     rule_value = 0
+                 },
+                 {
+                     rule_type = MAX
+                     rule_value = 99
+                 }
+             ]
+          }
+          ]
+       }
+
+   }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/log4j.properties
new file mode 100644
index 000000000..db5d9e512
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
index 64fc92056..885905feb 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
@@ -34,6 +34,7 @@
         <module>connector-iotdb-spark-e2e</module>
         <module>connector-jdbc-spark-e2e</module>
         <module>connector-mongodb-spark-e2e</module>
+        <module>connector-kafka-spark-e2e</module>
         <module>connector-influxdb-spark-e2e</module>
     </modules>