You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/21 06:33:10 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-v2] Neo4j source connector (#2777)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 38b0daf8b [Feature][Connector-v2] Neo4j source connector (#2777)
38b0daf8b is described below

commit 38b0daf8b7ba723f2ee18d1e0ab55073701c1b74
Author: Namgung Chan <33...@users.noreply.github.com>
AuthorDate: Fri Oct 21 15:33:03 2022 +0900

    [Feature][Connector-v2] Neo4j source connector (#2777)
---
 docs/en/connector-v2/source/Neo4j.md               | 100 ++++++++++++++
 plugin-mapping.properties                          |   1 +
 seatunnel-connectors-v2/connector-neo4j/pom.xml    |   5 +
 .../{Neo4jConfig.java => Neo4jCommonConfig.java}   |  12 +-
 .../{Neo4jConfig.java => Neo4jSinkConfig.java}     |  18 +--
 .../seatunnel/neo4j/config/Neo4jSourceConfig.java  |  22 ++++
 .../connectors/seatunnel/neo4j/sink/Neo4jSink.java |  37 +++---
 .../seatunnel/neo4j/sink/Neo4jSinkWriter.java      |  10 +-
 .../Neo4jSink.java => source/Neo4jSource.java}     |  94 +++++++-------
 .../seatunnel/neo4j/source/Neo4jSourceReader.java  | 143 +++++++++++++++++++++
 .../src/main/resources/examples/fake_to_neo4j.conf |  63 ---------
 .../Neo4jSourceReaderTest.java                     |  73 +++++++++++
 .../connector-neo4j-flink-e2e/pom.xml              |  59 +++++++++
 .../seatunnel/e2e/flink/v2/neo4j/Neo4jIT.java      | 141 ++++++++++++++++++++
 .../src/test/resources/log4j.properties            |  22 ++++
 .../src/test/resources/neo4j/neo4j_to_neo4j.conf   |  90 +++++++++++++
 .../seatunnel-flink-connector-v2-e2e/pom.xml       |   1 +
 .../{ => connector-neo4j-spark-e2e}/pom.xml        |  53 +++++---
 .../seatunnel/e2e/spark/v2/neo4j/Neo4jIT.java      | 139 ++++++++++++++++++++
 .../src/test/resources/log4j.properties            |  23 ++++
 .../src/test/resources/neo4j/neo4j_to_neo4j.conf   |  90 +++++++++++++
 .../seatunnel-spark-connector-v2-e2e/pom.xml       |   1 +
 22 files changed, 1014 insertions(+), 183 deletions(-)

diff --git a/docs/en/connector-v2/source/Neo4j.md b/docs/en/connector-v2/source/Neo4j.md
new file mode 100644
index 000000000..75026cd7b
--- /dev/null
+++ b/docs/en/connector-v2/source/Neo4j.md
@@ -0,0 +1,100 @@
+# Neo4j
+
+> Neo4j source connector
+
+## Description
+
+Read data from Neo4j.
+
+`neo4j-java-driver` version 4.4.9
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name                       | type   | required | default value |
+|----------------------------|--------|----------|---------------|
+| uri                        | String | Yes      | -             |
+| username                   | String | No       | -             |
+| password                   | String | No       | -             |
+| bearer_token               | String | No       | -             |
+| kerberos_ticket            | String | No       | -             |
+| database                   | String | Yes      | -             |
+| query                      | String | Yes      | -             |
+| schema.fields              | Object | Yes      | -             |
+| max_transaction_retry_time | Long   | No       | 30            |
+| max_connection_timeout     | Long   | No       | 30            |
+
+### uri [string]
+
+The URI of the Neo4j database. Refer to a case: `neo4j://localhost:7687`
+
+### username [string]
+
+username of the Neo4j
+
+### password [string]
+
+password of the Neo4j. required if `username` is provided
+
+### bearer_token [string]
+
+base64 encoded bearer token of the Neo4j. for Auth.
+
+### kerberos_ticket [string]
+
+base64 encoded kerberos ticket of the Neo4j. for Auth.
+
+### database [string]
+
+database name.
+
+### query [string]
+
+Query statement.
+
+### schema.fields [string]
+
+returned fields of `query`
+
+see [schema projection](../../concept/connector-v2-features.md)
+
+### max_transaction_retry_time [long]
+
+maximum transaction retry time(seconds). transaction fail if exceeded
+
+### max_connection_timeout [long]
+
+The maximum amount of time to wait for a TCP connection to be established (seconds)
+
+## Example
+
+```
+source {
+    Neo4j {
+        uri = "neo4j://localhost:7687"
+        username = "neo4j"
+        password = "1234"
+        database = "neo4j"
+    
+        max_transaction_retry_time = 1
+        max_connection_timeout = 1
+    
+        query = "MATCH (a:Person) RETURN a.name, a.age"
+    
+        schema {
+            fields {
+                a.age=INT
+                a.name=STRING
+            }
+        }
+    }
+}
+```
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index fa70c6ea7..c2ce92848 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -119,6 +119,7 @@ seatunnel.sink.DingTalk = connector-dingtalk
 seatunnel.sink.elasticsearch = connector-elasticsearch
 seatunnel.source.IoTDB = connector-iotdb
 seatunnel.sink.IoTDB = connector-iotdb
+seatunnel.source.Neo4j = connector-neo4j
 seatunnel.sink.Neo4j = connector-neo4j
 seatunnel.source.FtpFile = connector-file-ftp
 seatunnel.sink.FtpFile = connector-file-ftp
diff --git a/seatunnel-connectors-v2/connector-neo4j/pom.xml b/seatunnel-connectors-v2/connector-neo4j/pom.xml
index 40ac9188f..da3d649fe 100644
--- a/seatunnel-connectors-v2/connector-neo4j/pom.xml
+++ b/seatunnel-connectors-v2/connector-neo4j/pom.xml
@@ -37,6 +37,11 @@
             <artifactId>neo4j-java-driver</artifactId>
             <version>${neo4j-java-driver.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jConfig.java b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jCommonConfig.java
similarity index 85%
copy from seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jConfig.java
copy to seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jCommonConfig.java
index 660437286..e5a828137 100644
--- a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jConfig.java
+++ b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jCommonConfig.java
@@ -21,12 +21,10 @@ import lombok.Getter;
 import lombok.Setter;
 
 import java.io.Serializable;
-import java.util.Map;
 
 @Getter
 @Setter
-public class Neo4jConfig implements Serializable {
-
+public abstract class Neo4jCommonConfig implements Serializable {
     public static final String PLUGIN_NAME = "Neo4j";
     public static final String KEY_NEO4J_URI = "uri";
     public static final String KEY_USERNAME = "username";
@@ -36,13 +34,9 @@ public class Neo4jConfig implements Serializable {
 
     public static final String KEY_DATABASE = "database";
     public static final String KEY_QUERY = "query";
-    public static final String KEY_QUERY_PARAM_POSITION = "queryParamPosition";
     public static final String KEY_MAX_TRANSACTION_RETRY_TIME = "max_transaction_retry_time";
     public static final String KEY_MAX_CONNECTION_TIMEOUT = "max_connection_timeout";
 
-
-    private DriverBuilder driverBuilder;
-    private String query;
-    private Map<String, Object> queryParamPosition;
-
+    protected DriverBuilder driverBuilder;
+    protected String query;
 }
diff --git a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jConfig.java b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkConfig.java
similarity index 57%
rename from seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jConfig.java
rename to seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkConfig.java
index 660437286..b8909d7fc 100644
--- a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jConfig.java
+++ b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSinkConfig.java
@@ -20,29 +20,13 @@ package org.apache.seatunnel.connectors.seatunnel.neo4j.config;
 import lombok.Getter;
 import lombok.Setter;
 
-import java.io.Serializable;
 import java.util.Map;
 
 @Getter
 @Setter
-public class Neo4jConfig implements Serializable {
-
-    public static final String PLUGIN_NAME = "Neo4j";
-    public static final String KEY_NEO4J_URI = "uri";
-    public static final String KEY_USERNAME = "username";
-    public static final String KEY_PASSWORD = "password";
-    public static final String KEY_BEARER_TOKEN = "bearer_token";
-    public static final String KEY_KERBEROS_TICKET = "kerberos_ticket"; // Base64 encoded
-
-    public static final String KEY_DATABASE = "database";
-    public static final String KEY_QUERY = "query";
+public class Neo4jSinkConfig extends Neo4jCommonConfig {
     public static final String KEY_QUERY_PARAM_POSITION = "queryParamPosition";
-    public static final String KEY_MAX_TRANSACTION_RETRY_TIME = "max_transaction_retry_time";
-    public static final String KEY_MAX_CONNECTION_TIMEOUT = "max_connection_timeout";
-
 
-    private DriverBuilder driverBuilder;
-    private String query;
     private Map<String, Object> queryParamPosition;
 
 }
diff --git a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceConfig.java b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceConfig.java
new file mode 100644
index 000000000..f9eb6d566
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceConfig.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.neo4j.config;
+
+public class Neo4jSourceConfig extends Neo4jCommonConfig {
+
+}
diff --git a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
index 2285ef064..6aff6d2ee 100644
--- a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
+++ b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
@@ -17,17 +17,17 @@
 
 package org.apache.seatunnel.connectors.seatunnel.neo4j.sink;
 
-import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_BEARER_TOKEN;
-import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_KERBEROS_TICKET;
-import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_MAX_CONNECTION_TIMEOUT;
-import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_MAX_TRANSACTION_RETRY_TIME;
-import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_NEO4J_URI;
-import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_PASSWORD;
-import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_QUERY;
-import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_QUERY_PARAM_POSITION;
-import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_USERNAME;
-import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.PLUGIN_NAME;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_BEARER_TOKEN;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_DATABASE;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_KERBEROS_TICKET;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_MAX_CONNECTION_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_MAX_TRANSACTION_RETRY_TIME;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_NEO4J_URI;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_QUERY;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_QUERY_PARAM_POSITION;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.KEY_USERNAME;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.PLUGIN_NAME;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -39,7 +39,7 @@ import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.neo4j.config.DriverBuilder;
-import org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -53,7 +53,7 @@ import java.net.URI;
 public class Neo4jSink implements SeaTunnelSink<SeaTunnelRow, Void, Void, Void> {
 
     private SeaTunnelRowType rowType;
-    private final Neo4jConfig neo4jConfig = new Neo4jConfig();
+    private final Neo4jSinkConfig neo4JSinkConfig = new Neo4jSinkConfig();
 
     @Override
     public String getPluginName() {
@@ -62,14 +62,14 @@ public class Neo4jSink implements SeaTunnelSink<SeaTunnelRow, Void, Void, Void>
 
     @Override
     public void prepare(Config config) throws PrepareFailException {
-        neo4jConfig.setDriverBuilder(prepareDriver(config));
+        neo4JSinkConfig.setDriverBuilder(prepareDriver(config));
 
         final CheckResult queryConfigCheck = CheckConfigUtil.checkAllExists(config, KEY_QUERY, KEY_QUERY_PARAM_POSITION);
         if (!queryConfigCheck.isSuccess()) {
             throw new PrepareFailException(PLUGIN_NAME, PluginType.SINK, queryConfigCheck.getMsg());
         }
-        neo4jConfig.setQuery(config.getString(KEY_QUERY));
-        neo4jConfig.setQueryParamPosition(config.getObject(KEY_QUERY_PARAM_POSITION).unwrapped());
+        neo4JSinkConfig.setQuery(config.getString(KEY_QUERY));
+        neo4JSinkConfig.setQueryParamPosition(config.getObject(KEY_QUERY_PARAM_POSITION).unwrapped());
 
     }
 
@@ -82,9 +82,6 @@ public class Neo4jSink implements SeaTunnelSink<SeaTunnelRow, Void, Void, Void>
         }
 
         final URI uri = URI.create(config.getString(KEY_NEO4J_URI));
-        if (!"neo4j".equals(uri.getScheme())) {
-            throw new PrepareFailException(PLUGIN_NAME, PluginType.SINK, "uri scheme is not `neo4j`");
-        }
 
         final DriverBuilder driverBuilder = DriverBuilder.create(uri);
 
@@ -132,7 +129,7 @@ public class Neo4jSink implements SeaTunnelSink<SeaTunnelRow, Void, Void, Void>
 
     @Override
     public SinkWriter<SeaTunnelRow, Void, Void> createWriter(SinkWriter.Context context) throws IOException {
-        return new Neo4jSinkWriter(neo4jConfig);
+        return new Neo4jSinkWriter(neo4JSinkConfig);
     }
 
 }
diff --git a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java
index 74214c9eb..daeca04f2 100644
--- a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.neo4j.sink;
 
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig;
 
 import lombok.extern.slf4j.Slf4j;
 import org.neo4j.driver.Driver;
@@ -35,14 +35,14 @@ import java.util.stream.Collectors;
 @Slf4j
 public class Neo4jSinkWriter implements SinkWriter<SeaTunnelRow, Void, Void> {
 
-    private final Neo4jConfig config;
+    private final Neo4jSinkConfig config;
     private final transient Driver driver;
     private final transient Session session;
 
-    public Neo4jSinkWriter(Neo4jConfig neo4jConfig) {
-        this.config = neo4jConfig;
+    public Neo4jSinkWriter(Neo4jSinkConfig neo4JSinkConfig) {
+        this.config = neo4JSinkConfig;
         this.driver = config.getDriverBuilder().build();
-        this.session = driver.session(SessionConfig.forDatabase(neo4jConfig.getDriverBuilder().getDatabase()));
+        this.session = driver.session(SessionConfig.forDatabase(neo4JSinkConfig.getDriverBuilder().getDatabase()));
     }
 
     @Override
diff --git a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
similarity index 65%
copy from seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
copy to seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
index 2285ef064..dc52b91ad 100644
--- a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
+++ b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
@@ -15,45 +15,47 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.neo4j.sink;
-
-import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_BEARER_TOKEN;
-import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_DATABASE;
-import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_KERBEROS_TICKET;
-import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_MAX_CONNECTION_TIMEOUT;
-import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_MAX_TRANSACTION_RETRY_TIME;
-import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_NEO4J_URI;
-import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_PASSWORD;
-import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_QUERY;
-import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_QUERY_PARAM_POSITION;
-import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_USERNAME;
-import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.PLUGIN_NAME;
+package org.apache.seatunnel.connectors.seatunnel.neo4j.source;
+
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_BEARER_TOKEN;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_DATABASE;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_KERBEROS_TICKET;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_MAX_CONNECTION_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_MAX_TRANSACTION_RETRY_TIME;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_NEO4J_URI;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_QUERY;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_USERNAME;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.PLUGIN_NAME;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
 import org.apache.seatunnel.connectors.seatunnel.neo4j.config.DriverBuilder;
-import org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.auto.service.AutoService;
 import org.neo4j.driver.AuthTokens;
 
-import java.io.IOException;
 import java.net.URI;
 
-@AutoService(SeaTunnelSink.class)
-public class Neo4jSink implements SeaTunnelSink<SeaTunnelRow, Void, Void, Void> {
+@AutoService(SeaTunnelSource.class)
+public class Neo4jSource extends AbstractSingleSplitSource<SeaTunnelRow> {
 
+    private final Neo4jSourceConfig neo4jSourceConfig = new Neo4jSourceConfig();
     private SeaTunnelRowType rowType;
-    private final Neo4jConfig neo4jConfig = new Neo4jConfig();
 
     @Override
     public String getPluginName() {
@@ -61,16 +63,31 @@ public class Neo4jSink implements SeaTunnelSink<SeaTunnelRow, Void, Void, Void>
     }
 
     @Override
-    public void prepare(Config config) throws PrepareFailException {
-        neo4jConfig.setDriverBuilder(prepareDriver(config));
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        neo4jSourceConfig.setDriverBuilder(prepareDriver(pluginConfig));
 
-        final CheckResult queryConfigCheck = CheckConfigUtil.checkAllExists(config, KEY_QUERY, KEY_QUERY_PARAM_POSITION);
-        if (!queryConfigCheck.isSuccess()) {
-            throw new PrepareFailException(PLUGIN_NAME, PluginType.SINK, queryConfigCheck.getMsg());
+        final CheckResult configCheck = CheckConfigUtil.checkAllExists(pluginConfig, KEY_QUERY, SeaTunnelSchema.SCHEMA);
+        if (!configCheck.isSuccess()) {
+            throw new PrepareFailException(Neo4jSourceConfig.PLUGIN_NAME, PluginType.SOURCE, configCheck.getMsg());
         }
-        neo4jConfig.setQuery(config.getString(KEY_QUERY));
-        neo4jConfig.setQueryParamPosition(config.getObject(KEY_QUERY_PARAM_POSITION).unwrapped());
+        neo4jSourceConfig.setQuery(pluginConfig.getString(KEY_QUERY));
 
+        this.rowType = SeaTunnelSchema.buildWithConfig(pluginConfig.getConfig(SeaTunnelSchema.SCHEMA)).getSeaTunnelRowType();
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        return this.rowType;
+    }
+
+    @Override
+    public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext) throws Exception {
+        return new Neo4jSourceReader(readerContext, neo4jSourceConfig, rowType);
     }
 
     private DriverBuilder prepareDriver(Config config) {
@@ -78,20 +95,17 @@ public class Neo4jSink implements SeaTunnelSink<SeaTunnelRow, Void, Void, Void>
         final CheckResult authConfigCheck = CheckConfigUtil.checkAtLeastOneExists(config, KEY_USERNAME, KEY_BEARER_TOKEN, KEY_KERBEROS_TICKET);
         final CheckResult mergedConfigCheck = CheckConfigUtil.mergeCheckResults(uriConfigCheck, authConfigCheck);
         if (!mergedConfigCheck.isSuccess()) {
-            throw new PrepareFailException(PLUGIN_NAME, PluginType.SINK, mergedConfigCheck.getMsg());
+            throw new PrepareFailException(Neo4jSourceConfig.PLUGIN_NAME, PluginType.SOURCE, mergedConfigCheck.getMsg());
         }
 
         final URI uri = URI.create(config.getString(KEY_NEO4J_URI));
-        if (!"neo4j".equals(uri.getScheme())) {
-            throw new PrepareFailException(PLUGIN_NAME, PluginType.SINK, "uri scheme is not `neo4j`");
-        }
 
         final DriverBuilder driverBuilder = DriverBuilder.create(uri);
 
         if (config.hasPath(KEY_USERNAME)) {
             final CheckResult pwParamCheck = CheckConfigUtil.checkAllExists(config, KEY_PASSWORD);
             if (!mergedConfigCheck.isSuccess()) {
-                throw new PrepareFailException(PLUGIN_NAME, PluginType.SINK, pwParamCheck.getMsg());
+                throw new PrepareFailException(Neo4jSourceConfig.PLUGIN_NAME, PluginType.SOURCE, pwParamCheck.getMsg());
             }
             final String username = config.getString(KEY_USERNAME);
             final String password = config.getString(KEY_PASSWORD);
@@ -119,20 +133,4 @@ public class Neo4jSink implements SeaTunnelSink<SeaTunnelRow, Void, Void, Void>
 
         return driverBuilder;
     }
-
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.rowType = seaTunnelRowType;
-    }
-
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.rowType;
-    }
-
-    @Override
-    public SinkWriter<SeaTunnelRow, Void, Void> createWriter(SinkWriter.Context context) throws IOException {
-        return new Neo4jSinkWriter(neo4jConfig);
-    }
-
 }
diff --git a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSourceReader.java b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSourceReader.java
new file mode 100644
index 000000000..16b269b2d
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSourceReader.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.neo4j.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig;
+
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Query;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.Value;
+import org.neo4j.driver.exceptions.value.LossyCoercion;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.List;
+import java.util.Objects;
+
+public class Neo4jSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
+
+    private final SingleSplitReaderContext context;
+    private final Neo4jSourceConfig config;
+    private final SeaTunnelRowType rowType;
+    private final Driver driver;
+    private Session session;
+
+    public Neo4jSourceReader(SingleSplitReaderContext context, Neo4jSourceConfig config, SeaTunnelRowType rowType) {
+        this.context = context;
+        this.config = config;
+        this.driver = config.getDriverBuilder().build();
+        this.rowType = rowType;
+    }
+
+    @Override
+    public void open() throws Exception {
+        this.session = driver.session(SessionConfig.forDatabase(config.getDriverBuilder().getDatabase()));
+    }
+
+    @Override
+    public void close() throws IOException {
+        session.close();
+        driver.close();
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        final Query query = new Query(config.getQuery());
+        session.readTransaction(tx -> {
+            final Result result = tx.run(query);
+            result.stream()
+                .forEach(row -> {
+                    final Object[] fields = new Object[rowType.getTotalFields()];
+                    for (int i = 0; i < rowType.getTotalFields(); i++) {
+                        final String fieldName = rowType.getFieldName(i);
+                        final SeaTunnelDataType<?> fieldType = rowType.getFieldType(i);
+                        final Value value = row.get(fieldName);
+                        fields[i] = convertType(fieldType, value);
+                    }
+                    output.collect(new SeaTunnelRow(fields));
+                });
+            return null;
+        });
+        this.context.signalNoMoreElement();
+    }
+
+    /**
+     * convert {@link SeaTunnelDataType} to java data type
+     *
+     * @throws IllegalArgumentException when not supported data type
+     * @throws LossyCoercion            when conversion cannot be achieved without losing precision.
+     */
+    public static Object convertType(SeaTunnelDataType<?> dataType, Value value) throws IllegalArgumentException, LossyCoercion {
+        Objects.requireNonNull(dataType);
+        Objects.requireNonNull(value);
+
+        switch (dataType.getSqlType()) {
+            case STRING:
+                return value.asString();
+            case BOOLEAN:
+                return value.asBoolean();
+            case BIGINT:
+                return value.asLong();
+            case DOUBLE:
+                return value.asDouble();
+            case NULL:
+                return null;
+            case BYTES:
+                return value.asByteArray();
+            case DATE:
+                return value.asLocalDate();
+            case TIME:
+                return value.asLocalTime();
+            case TIMESTAMP:
+                return value.asLocalDateTime();
+            case MAP:
+                if (!((MapType<?, ?>) dataType).getKeyType().equals(BasicType.STRING_TYPE)) {
+                    throw new IllegalArgumentException("Key Type of MapType must String type");
+                }
+                final SeaTunnelDataType<?> valueType = ((MapType<?, ?>) dataType).getValueType();
+                return value.asMap(v -> valueType.getTypeClass().cast(convertType(valueType, v)));
+            case ARRAY:
+                final BasicType<?> elementType = ((ArrayType<?, ?>) dataType).getElementType();
+                final List<?> list = value.asList(v -> elementType.getTypeClass().cast(convertType(elementType, v)));
+                final Object array = Array.newInstance(elementType.getTypeClass(), list.size());
+                for (int i = 0; i < list.size(); i++) {
+                    Array.set(array, i, list.get(i));
+                }
+                return array;
+            case INT:
+                return value.asInt();
+            case FLOAT:
+                return value.asFloat();
+            default:
+                throw new IllegalArgumentException("not supported data type: " + dataType);
+        }
+
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-neo4j/src/main/resources/examples/fake_to_neo4j.conf b/seatunnel-connectors-v2/connector-neo4j/src/main/resources/examples/fake_to_neo4j.conf
deleted file mode 100644
index 3938a749b..000000000
--- a/seatunnel-connectors-v2/connector-neo4j/src/main/resources/examples/fake_to_neo4j.conf
+++ /dev/null
@@ -1,63 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in seatunnel config
-######
-
-env {
-  # You can set flink configuration here
-  execution.parallelism = 1
-  #job.mode = "STREAM"
-  #execution.checkpoint.interval = 10000
-  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
-}
-
-source {
-  # This is a example source plugin **only for test and demonstrate the feature source plugin**
-    FakeSource {
-      result_table_name = "fake"
-      field_name = "name,age"
-    }
-
-  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
-  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
-}
-
-transform {
-
-}
-
-sink {
-  Neo4j {
-    uri = "neo4j://localhost:7687"
-    username = "neo4j"
-    password = "1234"
-    database = "neo4j"
-
-    max_transaction_retry_time = 1
-    max_connection_timeout = 1
-
-    query = "CREATE (a:Person {name: $name, age: $age})"
-    queryParamPosition = {
-        name = 0
-        age = 1
-    }
-  }
-
-  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
-  # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
-}
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-neo4j/src/test/java/org.apache.seatunnel.connectors.seatunnel.neo4j.source/Neo4jSourceReaderTest.java b/seatunnel-connectors-v2/connector-neo4j/src/test/java/org.apache.seatunnel.connectors.seatunnel.neo4j.source/Neo4jSourceReaderTest.java
new file mode 100644
index 000000000..c3f511751
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-neo4j/src/test/java/org.apache.seatunnel.connectors.seatunnel.neo4j.source/Neo4jSourceReaderTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.neo4j.source;
+
+import static org.apache.seatunnel.api.table.type.ArrayType.STRING_ARRAY_TYPE;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+
+import org.junit.jupiter.api.Test;
+import org.neo4j.driver.exceptions.value.LossyCoercion;
+import org.neo4j.driver.internal.value.BooleanValue;
+import org.neo4j.driver.internal.value.BytesValue;
+import org.neo4j.driver.internal.value.DateValue;
+import org.neo4j.driver.internal.value.FloatValue;
+import org.neo4j.driver.internal.value.IntegerValue;
+import org.neo4j.driver.internal.value.ListValue;
+import org.neo4j.driver.internal.value.LocalDateTimeValue;
+import org.neo4j.driver.internal.value.LocalTimeValue;
+import org.neo4j.driver.internal.value.MapValue;
+import org.neo4j.driver.internal.value.NullValue;
+import org.neo4j.driver.internal.value.StringValue;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Collections;
+
+class Neo4jSourceReaderTest {
+    @Test
+    void convertType() {
+        assertEquals("test", Neo4jSourceReader.convertType(BasicType.STRING_TYPE, new StringValue("test")));
+        assertEquals(true, Neo4jSourceReader.convertType(BasicType.BOOLEAN_TYPE, BooleanValue.TRUE));
+        assertEquals(1L, Neo4jSourceReader.convertType(BasicType.LONG_TYPE, new IntegerValue(1L)));
+        assertEquals(1.5, Neo4jSourceReader.convertType(BasicType.DOUBLE_TYPE, new FloatValue(1.5)));
+        assertNull(Neo4jSourceReader.convertType(BasicType.VOID_TYPE, NullValue.NULL));
+        assertEquals((byte) 1, ((byte[]) Neo4jSourceReader.convertType(PrimitiveByteArrayType.INSTANCE, new BytesValue(new byte[]{(byte) 1})))[0]);
+        assertEquals(LocalDate.MIN, Neo4jSourceReader.convertType(LocalTimeType.LOCAL_DATE_TYPE, new DateValue(LocalDate.MIN)));
+        assertEquals(LocalTime.MIN, Neo4jSourceReader.convertType(LocalTimeType.LOCAL_TIME_TYPE, new LocalTimeValue(LocalTime.MIN)));
+        assertEquals(LocalDateTime.MIN, Neo4jSourceReader.convertType(LocalTimeType.LOCAL_DATE_TIME_TYPE, new LocalDateTimeValue(LocalDateTime.MIN)));
+        assertEquals(Collections.singletonMap("1", false),
+            Neo4jSourceReader.convertType(new MapType<>(BasicType.STRING_TYPE, BasicType.BOOLEAN_TYPE), new MapValue(Collections.singletonMap("1", BooleanValue.FALSE))));
+        assertArrayEquals(new Object[]{"foo", "bar"},
+            (Object[]) Neo4jSourceReader.convertType(STRING_ARRAY_TYPE, new ListValue(new StringValue("foo"), new StringValue("bar"))));
+        assertEquals(1, Neo4jSourceReader.convertType(BasicType.INT_TYPE, new IntegerValue(1)));
+        assertEquals(1.1F, Neo4jSourceReader.convertType(BasicType.FLOAT_TYPE, new FloatValue(1.1F)));
+
+        assertThrows(IllegalArgumentException.class, () -> Neo4jSourceReader.convertType(BasicType.SHORT_TYPE, new IntegerValue(256)));
+        assertThrows(LossyCoercion.class, () -> Neo4jSourceReader.convertType(BasicType.INT_TYPE, new IntegerValue(Integer.MAX_VALUE + 1L)));
+        assertThrows(IllegalArgumentException.class, () -> Neo4jSourceReader.convertType(new MapType<>(BasicType.INT_TYPE, BasicType.BOOLEAN_TYPE), new MapValue(Collections.singletonMap("1", BooleanValue.FALSE))));
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-neo4j-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-neo4j-flink-e2e/pom.xml
new file mode 100644
index 000000000..9b040ccdc
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-neo4j-flink-e2e/pom.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project
+        xmlns="http://maven.apache.org/POM/4.0.0"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-flink-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-neo4j-flink-e2e</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-flink-e2e-base</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- SeaTunnel connectors -->
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-neo4j</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-fake</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-neo4j-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/neo4j/Neo4jIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-neo4j-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/neo4j/Neo4jIT.java
new file mode 100644
index 000000000..0c489a788
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-neo4j-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/neo4j/Neo4jIT.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.neo4j;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.neo4j.driver.Values.parameters;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.Value;
+import org.neo4j.driver.types.Node;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class Neo4jIT extends FlinkContainer {
+
+    private static final String CONTAINER_IMAGE = "neo4j:latest";
+    private static final String CONTAINER_HOST = "neo4j-host";
+    private static final int CONTAINER_PORT = 7687;
+    private static final String CONTAINER_NEO4J_USERNAME = "neo4j";
+    private static final String CONTAINER_NEO4J_PASSWORD = "1234";
+    private static final URI CONTAINER_URI = URI.create("neo4j://localhost:" + CONTAINER_PORT);
+
+    private GenericContainer<?> container;
+    private Driver neo4jDriver;
+    private Session neo4jSession;
+
+    @BeforeAll
+    public void init() {
+        DockerImageName imageName = DockerImageName.parse(CONTAINER_IMAGE);
+        container = new GenericContainer<>(imageName)
+            .withNetwork(NETWORK)
+            .withNetworkAliases(CONTAINER_HOST)
+            .withExposedPorts(CONTAINER_PORT)
+            .withEnv("NEO4J_AUTH", CONTAINER_NEO4J_USERNAME + "/" + CONTAINER_NEO4J_PASSWORD)
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(CONTAINER_IMAGE)));
+        container.setPortBindings(Lists.newArrayList(String.format("%s:%s", CONTAINER_PORT, CONTAINER_PORT)));
+        Startables.deepStart(Stream.of(container)).join();
+        log.info("container started");
+        Awaitility.given().ignoreExceptions()
+            .await()
+            .atMost(30, TimeUnit.SECONDS)
+            .untilAsserted(this::initConnection);
+
+    }
+
+    private void initConnection() {
+        neo4jDriver = GraphDatabase.driver(CONTAINER_URI, AuthTokens.basic(CONTAINER_NEO4J_USERNAME, CONTAINER_NEO4J_PASSWORD));
+        neo4jSession = neo4jDriver.session(SessionConfig.forDatabase("neo4j"));
+    }
+
+    @Test
+    public void test() throws IOException, InterruptedException {
+        // given
+        neo4jSession.run("CREATE (t:Test {string:'foo', boolean:true, long:2147483648, double:1.7976931348623157E308, " +
+                "byteArray:$byteArray, date:date('2022-10-07'), localTime:localtime('20:04:00'), localDateTime:localdatetime('2022-10-07T20:04:00'), " +
+                "list:[0, 1], int:2147483647, float:$float})",
+            parameters("byteArray", new byte[]{(byte) 1}, "float", Float.MAX_VALUE)
+        );
+        // when
+        final Container.ExecResult execResult = executeSeaTunnelFlinkJob("/neo4j/neo4j_to_neo4j.conf");
+        // then
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        final Result result = neo4jSession.run("MATCH (tt:TestTest) RETURN tt");
+        final Node tt = result.single().get("tt").asNode();
+
+        assertEquals("foo", tt.get("string").asString());
+        assertTrue(tt.get("boolean").asBoolean());
+        assertEquals(2147483648L, tt.get("long").asLong());
+        assertEquals(Double.MAX_VALUE, tt.get("double").asDouble());
+        assertArrayEquals(new byte[]{(byte) 1}, tt.get("byteArray").asByteArray());
+        assertEquals(LocalDate.parse("2022-10-07"), tt.get("date").asLocalDate());
+        assertEquals(LocalTime.parse("20:04:00"), tt.get("localTime").asLocalTime());
+        assertEquals(LocalDateTime.parse("2022-10-07T20:04:00"), tt.get("localDateTime").asLocalDateTime());
+        final ArrayList<Integer> expectedList = new ArrayList<>();
+        expectedList.add(0);
+        expectedList.add(1);
+        assertTrue(tt.get("list").asList(Value::asInt).containsAll(expectedList));
+        assertEquals(2147483647, tt.get("int").asInt());
+        assertEquals(2147483647, tt.get("mapValue").asInt());
+        assertEquals(Float.MAX_VALUE, tt.get("float").asFloat());
+    }
+
+    @AfterAll
+    public void close() {
+        if (neo4jSession != null) {
+            neo4jSession.close();
+        }
+        if (neo4jDriver != null) {
+            neo4jDriver.close();
+        }
+        if (container != null) {
+            container.close();
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-neo4j-flink-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-neo4j-flink-e2e/src/test/resources/log4j.properties
new file mode 100644
index 000000000..db5d9e512
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-neo4j-flink-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-neo4j-flink-e2e/src/test/resources/neo4j/neo4j_to_neo4j.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-neo4j-flink-e2e/src/test/resources/neo4j/neo4j_to_neo4j.conf
new file mode 100644
index 000000000..4d98b03a9
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-neo4j-flink-e2e/src/test/resources/neo4j/neo4j_to_neo4j.conf
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  #job.mode = "STREAM"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+    Neo4j {
+        uri = "neo4j://neo4j-host:7687"
+        username = "neo4j"
+        password = "1234"
+        database = "neo4j"
+
+        max_transaction_retry_time = 1
+        max_connection_timeout = 1
+
+        query = "MATCH (t:Test) WITH *, t{.int} AS _map RETURN t.string, t.boolean, t.long, t.double, t.byteArray, t.date, t.localTime, t.localDateTime, _map, t.list, t.int, t.float"
+
+        schema {
+            fields {
+                t.string=STRING
+                t.boolean=BOOLEAN
+                t.long=BIGINT
+                t.double=DOUBLE
+                t.null=NULL
+                t.byteArray=BYTES
+                t.date=DATE
+                t.localTime=TIME
+                t.localDateTime=TIMESTAMP
+                _map="MAP<STRING, INT>"
+                t.list="ARRAY<INT>"
+                t.int=INT
+                t.float=FLOAT
+            }
+        }
+    }
+}
+
+transform {
+}
+
+sink {
+  Neo4j {
+      uri = "neo4j://neo4j-host:7687"
+      username = "neo4j"
+      password = "1234"
+      database = "neo4j"
+
+      max_transaction_retry_time = 1
+      max_connection_timeout = 1
+
+      query = "CREATE (tt:TestTest {string:$string, boolean:$boolean, long:$long, double:$double, byteArray:$byteArray, date:date($date), localTime:localtime($localTime), localDateTime:localdatetime($localDateTime), list:$list, int:$int, float:$float, mapValue:$map['int']})"
+      queryParamPosition = {
+          string=0
+          boolean=1
+          long=2
+          double=3
+          byteArray=5
+          date=6
+          localTime=7
+          localDateTime=8
+          map=9
+          list=10
+          int=11
+          float=12
+      }
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index aea126d7f..7f84dfc1e 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -36,6 +36,7 @@
         <module>connector-fake-flink-e2e</module>
         <module>connector-mongodb-flink-e2e</module>
         <module>connector-iceberg-flink-e2e</module>
+        <module>connector-neo4j-flink-e2e</module>
         <module>connector-influxdb-flink-e2e</module>
         <module>connector-kafka-flink-e2e</module>
     </modules>
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-neo4j-spark-e2e/pom.xml
similarity index 54%
copy from seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
copy to seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-neo4j-spark-e2e/pom.xml
index 885905feb..bbe7d601c 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-neo4j-spark-e2e/pom.xml
@@ -1,50 +1,61 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
+
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
+
        http://www.apache.org/licenses/LICENSE-2.0
+
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
+
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.seatunnel</groupId>
-        <artifactId>seatunnel-e2e</artifactId>
+        <artifactId>seatunnel-spark-connector-v2-e2e</artifactId>
         <version>${revision}</version>
     </parent>
-    <modelVersion>4.0.0</modelVersion>
-    <packaging>pom</packaging>
 
-    <artifactId>seatunnel-spark-connector-v2-e2e</artifactId>
-
-    <modules>
-        <module>connector-spark-e2e-base</module>
-        <module>connector-datahub-spark-e2e</module>
-        <module>connector-fake-spark-e2e</module>
-        <module>connector-file-spark-e2e</module>
-        <module>connector-iotdb-spark-e2e</module>
-        <module>connector-jdbc-spark-e2e</module>
-        <module>connector-mongodb-spark-e2e</module>
-        <module>connector-kafka-spark-e2e</module>
-        <module>connector-influxdb-spark-e2e</module>
-    </modules>
+    <artifactId>connector-neo4j-spark-e2e</artifactId>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-spark-starter</artifactId>
+            <artifactId>connector-spark-e2e-base</artifactId>
             <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <type>test-jar</type>
             <scope>test</scope>
         </dependency>
-    </dependencies>
 
-</project>
\ No newline at end of file
+        <!-- SeaTunnel connectors -->
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-fake</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-neo4j</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-neo4j-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/neo4j/Neo4jIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-neo4j-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/neo4j/Neo4jIT.java
new file mode 100644
index 000000000..43a1df7f6
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-neo4j-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/neo4j/Neo4jIT.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.neo4j;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.neo4j.driver.Values.parameters;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.Value;
+import org.neo4j.driver.types.Node;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class Neo4jIT extends SparkContainer {
+
+    private static final String CONTAINER_IMAGE = "neo4j:latest";
+    private static final String CONTAINER_HOST = "neo4j-host";
+    private static final int CONTAINER_PORT = 7687;
+    private static final String CONTAINER_NEO4J_USERNAME = "neo4j";
+    private static final String CONTAINER_NEO4J_PASSWORD = "1234";
+    private static final URI CONTAINER_URI = URI.create("neo4j://localhost:" + CONTAINER_PORT);
+
+    private GenericContainer<?> container;
+    private Driver neo4jDriver;
+    private Session neo4jSession;
+
+    @BeforeAll
+    public void init() {
+        DockerImageName imageName = DockerImageName.parse(CONTAINER_IMAGE);
+        container = new GenericContainer<>(imageName)
+            .withExposedPorts(CONTAINER_PORT)
+            .withNetwork(NETWORK)
+            .withNetworkAliases(CONTAINER_HOST)
+            .withEnv("NEO4J_AUTH", CONTAINER_NEO4J_USERNAME + "/" + CONTAINER_NEO4J_PASSWORD)
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(CONTAINER_IMAGE)));
+        container.setPortBindings(Lists.newArrayList(String.format("%s:%s", CONTAINER_PORT, CONTAINER_PORT)));
+        Startables.deepStart(Stream.of(container)).join();
+        log.info("container started");
+        Awaitility.given().ignoreExceptions()
+            .await()
+            .atMost(30, TimeUnit.SECONDS)
+            .untilAsserted(this::initConnection);
+
+    }
+
+    private void initConnection() {
+        neo4jDriver = GraphDatabase.driver(CONTAINER_URI, AuthTokens.basic(CONTAINER_NEO4J_USERNAME, CONTAINER_NEO4J_PASSWORD));
+        neo4jSession = neo4jDriver.session(SessionConfig.forDatabase("neo4j"));
+    }
+
+    @Test
+    public void test() throws IOException, InterruptedException {
+        // given
+        neo4jSession.run("CREATE (t:Test {string:'foo', boolean:true, long:2147483648, double:1.7976931348623157E308, " +
+                "byteArray:$byteArray, date:date('2022-10-07'), localDateTime:localdatetime('2022-10-07T20:04:00'), " +
+                "list:[0, 1], int:2147483647, float:$float})",
+            parameters("byteArray", new byte[]{(byte) 1}, "float", Float.MAX_VALUE)
+        );
+        // when
+        final Container.ExecResult execResult = executeSeaTunnelSparkJob("/neo4j/neo4j_to_neo4j.conf");
+        // then
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        final Result result = neo4jSession.run("MATCH (tt:TestTest) RETURN tt");
+        final Node tt = result.single().get("tt").asNode();
+
+        assertEquals("foo", tt.get("string").asString());
+        assertTrue(tt.get("boolean").asBoolean());
+        assertEquals(2147483648L, tt.get("long").asLong());
+        assertEquals(Double.MAX_VALUE, tt.get("double").asDouble());
+        assertArrayEquals(new byte[]{(byte) 1}, tt.get("byteArray").asByteArray());
+        assertEquals(LocalDate.parse("2022-10-07"), tt.get("date").asLocalDate());
+        assertEquals(LocalDateTime.parse("2022-10-07T20:04:00"), tt.get("localDateTime").asLocalDateTime());
+        final ArrayList<Integer> expectedList = new ArrayList<>();
+        expectedList.add(0);
+        expectedList.add(1);
+        assertTrue(tt.get("list").asList(Value::asInt).containsAll(expectedList));
+        assertEquals(2147483647, tt.get("int").asInt());
+        assertEquals(2147483647, tt.get("mapValue").asInt());
+        assertEquals(Float.MAX_VALUE, tt.get("float").asFloat());
+    }
+
+    @AfterAll
+    public void close() {
+        if (neo4jSession != null) {
+            neo4jSession.close();
+        }
+        if (neo4jDriver != null) {
+            neo4jDriver.close();
+        }
+        if (container != null) {
+            container.close();
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-neo4j-spark-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-neo4j-spark-e2e/src/test/resources/log4j.properties
new file mode 100644
index 000000000..89ed3ad31
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-neo4j-spark-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-neo4j-spark-e2e/src/test/resources/neo4j/neo4j_to_neo4j.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-neo4j-spark-e2e/src/test/resources/neo4j/neo4j_to_neo4j.conf
new file mode 100644
index 000000000..f876b0e85
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-neo4j-spark-e2e/src/test/resources/neo4j/neo4j_to_neo4j.conf
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+    Neo4j {
+        uri = "neo4j://neo4j-host:7687"
+        username = "neo4j"
+        password = "1234"
+        database = "neo4j"
+
+        max_transaction_retry_time = 1
+        max_connection_timeout = 1
+
+        query = "MATCH (t:Test) WITH *, t{.int} AS _map RETURN t.string, t.boolean, t.long, t.double, t.byteArray, t.date, t.localDateTime, _map, t.list, t.int, t.float"
+
+        schema {
+            fields {
+                t.string=STRING
+                t.boolean=BOOLEAN
+                t.long=BIGINT
+                t.double=DOUBLE
+                t.null=NULL
+                t.byteArray=BYTES
+                t.date=DATE
+                t.localDateTime=TIMESTAMP
+                _map="MAP<STRING, INT>"
+                t.list="ARRAY<INT>"
+                t.int=INT
+                t.float=FLOAT
+            }
+        }
+    }
+}
+
+transform {
+}
+
+sink {
+  Neo4j {
+      uri = "neo4j://neo4j-host:7687"
+      username = "neo4j"
+      password = "1234"
+      database = "neo4j"
+
+      max_transaction_retry_time = 1
+      max_connection_timeout = 1
+
+      query = "CREATE (tt:TestTest {string:$string, boolean:$boolean, long:$long, double:$double, byteArray:$byteArray, date:date($date), localDateTime:localdatetime($localDateTime), list:$list, int:$int, float:$float, mapValue:$map['int']})"
+      queryParamPosition = {
+          string=0
+          boolean=1
+          long=2
+          double=3
+          byteArray=5
+          date=6
+          localDateTime=7
+          map=8
+          list=9
+          int=10
+          float=11
+      }
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
index 885905feb..a859112a3 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
@@ -34,6 +34,7 @@
         <module>connector-iotdb-spark-e2e</module>
         <module>connector-jdbc-spark-e2e</module>
         <module>connector-mongodb-spark-e2e</module>
+        <module>connector-neo4j-spark-e2e</module>
         <module>connector-kafka-spark-e2e</module>
         <module>connector-influxdb-spark-e2e</module>
     </modules>