You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/19 09:02:24 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-v2] Neo4j sink connector (#2434)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 950b27d13 [Feature][Connector-v2] Neo4j sink connector (#2434)
950b27d13 is described below

commit 950b27d13244ae89b5a94201018aaca5d60256a0
Author: Namgung Chan <33...@users.noreply.github.com>
AuthorDate: Fri Aug 19 18:02:17 2022 +0900

    [Feature][Connector-v2] Neo4j sink connector (#2434)
    
    * ConnectorV2 - Neo4j Sink
---
 docs/en/connector-v2/sink/Neo4j.md                 |  82 ++++++++++++
 plugin-mapping.properties                          |   1 +
 pom.xml                                            |   1 +
 seatunnel-connectors-v2-dist/pom.xml               |   5 +
 .../{ => connector-neo4j}/pom.xml                  |  52 +++-----
 .../seatunnel/neo4j/config/DriverBuilder.java      |  76 ++++++++++++
 .../seatunnel/neo4j/config/Neo4jConfig.java        |  48 +++++++
 .../connectors/seatunnel/neo4j/sink/Neo4jSink.java | 138 +++++++++++++++++++++
 .../seatunnel/neo4j/sink/Neo4jSinkWriter.java      |  74 +++++++++++
 .../src/main/resources/examples/fake_to_neo4j.conf |  63 ++++++++++
 seatunnel-connectors-v2/pom.xml                    |   1 +
 11 files changed, 503 insertions(+), 38 deletions(-)

diff --git a/docs/en/connector-v2/sink/Neo4j.md b/docs/en/connector-v2/sink/Neo4j.md
new file mode 100644
index 000000000..4ab8017fe
--- /dev/null
+++ b/docs/en/connector-v2/sink/Neo4j.md
@@ -0,0 +1,82 @@
+# Neo4j
+
+> Neo4j sink connector
+
+## Description
+
+Write data to Neo4j. 
+
+`neo4j-java-driver` version 4.4.9
+
+## Options
+
+| name                       | type   | required | default value |
+|----------------------------|--------|----------|---------------|
+| uri                        | String | Yes      | -             |
+| username                   | String | No       | -             |
+| password                   | String | No       | -             |
+| bearer_token               | String | No       | -             |
+| kerberos_ticket            | String | No       | -             |
+| database                   | String | Yes      | -             |
+| query                      | String | Yes      | -             |
+| queryParamPosition         | Object | Yes      | -             |
+| max_transaction_retry_time | Long   | No       | 30            |
+| max_connection_timeout     | Long   | No       | 30            |
+
+
+### uri [string]
+The URI of the Neo4j database. Refer to a case: `neo4j://localhost:7687`
+
+### username [string]
+username of the Neo4j
+
+### password [string]
+password of the Neo4j. required if `username` is provided
+
+### bearer_token [string]
+base64 encoded bearer token of the Neo4j. for Auth. 
+
+### kerberos_ticket [string]
+base64 encoded kerberos ticket of the Neo4j. for Auth.
+
+### database [string]
+database name.
+
+### query [string]
+Query statement. contain parameter placeholders that are substituted with the corresponding values at runtime
+
+### queryParamPosition [object]
+position mapping information for query parameters.
+
+key name is parameter placeholder name.
+
+associated value is position of field in input data row. 
+
+
+### max_transaction_retry_time [long]
+maximum transaction retry time(seconds). transaction fail if exceeded
+
+### max_connection_timeout [long]
+The maximum amount of time to wait for a TCP connection to be established (seconds)
+
+
+## Example
+```
+sink {
+  Neo4j {
+    uri = "neo4j://localhost:7687"
+    username = "neo4j"
+    password = "1234"
+    database = "neo4j"
+
+    max_transaction_retry_time = 10
+    max_connection_timeout = 10
+
+    query = "CREATE (a:Person {name: $name, age: $age})"
+    queryParamPosition = {
+        name = 0
+        age = 1
+    }
+  }
+}
+```
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 9d6983871..bf9e17a38 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -117,3 +117,4 @@ seatunnel.sink.DingTalk = connector-dingtalk
 seatunnel.sink.elasticsearch = connector-elasticsearch
 seatunnel.source.IoTDB = connector-iotdb
 seatunnel.sink.IoTDB = connector-iotdb
+seatunnel.sink.Neo4j = connector-neo4j
diff --git a/pom.xml b/pom.xml
index 70d209999..4679fa7f6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -227,6 +227,7 @@
         <checker.qual.version>3.10.0</checker.qual.version>
         <iotdb.version>0.13.1</iotdb.version>
         <awaitility.version>4.2.0</awaitility.version>
+        <neo4j-java-driver.version>4.4.9</neo4j-java-driver.version>
     </properties>
 
     <dependencyManagement>
diff --git a/seatunnel-connectors-v2-dist/pom.xml b/seatunnel-connectors-v2-dist/pom.xml
index 5cb853f27..80be3a66c 100644
--- a/seatunnel-connectors-v2-dist/pom.xml
+++ b/seatunnel-connectors-v2-dist/pom.xml
@@ -131,6 +131,11 @@
             <artifactId>connector-iotdb</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-neo4j</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/connector-neo4j/pom.xml
similarity index 50%
copy from seatunnel-connectors-v2/pom.xml
copy to seatunnel-connectors-v2/connector-neo4j/pom.xml
index ed84c6dce..a4f2f481b 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/connector-neo4j/pom.xml
@@ -17,53 +17,29 @@
     limitations under the License.
 
 -->
-
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
     <parent>
-        <artifactId>seatunnel</artifactId>
+        <artifactId>seatunnel-connectors-v2</artifactId>
         <groupId>org.apache.seatunnel</groupId>
         <version>${revision}</version>
     </parent>
-    <modelVersion>4.0.0</modelVersion>
-    <packaging>pom</packaging>
-    <artifactId>seatunnel-connectors-v2</artifactId>
 
-    <modules>
-        <module>connector-common</module>
-        <module>connector-clickhouse</module>
-        <module>connector-console</module>
-        <module>connector-fake</module>
-        <module>connector-http</module>
-        <module>connector-jdbc</module>
-        <module>connector-kafka</module>
-        <module>connector-pulsar</module>
-        <module>connector-socket</module>
-        <module>connector-hive</module>
-        <module>connector-file</module>
-        <module>connector-hudi</module>
-        <module>connector-assert</module>
-        <module>connector-kudu</module>
-        <module>connector-email</module>
-        <module>connector-dingtalk</module>
-        <module>connector-elasticsearch</module>
-        <module>connector-iotdb</module>
-    </modules>
+    <artifactId>connector-neo4j</artifactId>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.neo4j.driver</groupId>
+            <artifactId>neo4j-java-driver</artifactId>
+            <version>${neo4j-java-driver.version}</version>
+        </dependency>
     </dependencies>
 
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-dependency-plugin</artifactId>
-                <configuration>
-                    <skip>true</skip>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>
+</project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/DriverBuilder.java b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/DriverBuilder.java
new file mode 100644
index 000000000..ac8914e46
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/DriverBuilder.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.neo4j.config;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+@Getter
+@Setter
+public class DriverBuilder implements Serializable {
+    private final URI uri;
+    private String username;
+    private String password;
+    private String bearerToken;
+    private String kerberosTicket;
+    private String database;
+
+    private Long maxTransactionRetryTimeSeconds;
+    private Long maxConnectionTimeoutSeconds;
+
+    public static DriverBuilder create(URI uri) {
+        return new DriverBuilder(uri);
+    }
+
+    private DriverBuilder(URI uri) {
+        this.uri = uri;
+    }
+
+    public Driver build() {
+        final Config.ConfigBuilder configBuilder = Config.builder()
+                .withMaxConnectionPoolSize(1);
+        if (maxConnectionTimeoutSeconds != null) {
+            configBuilder
+                    .withConnectionAcquisitionTimeout(maxConnectionTimeoutSeconds * 2, TimeUnit.SECONDS)
+                    .withConnectionTimeout(maxConnectionTimeoutSeconds, TimeUnit.SECONDS);
+        }
+        if (maxTransactionRetryTimeSeconds != null) {
+            configBuilder
+                    .withMaxTransactionRetryTime(maxTransactionRetryTimeSeconds, TimeUnit.SECONDS);
+        }
+        Config config = configBuilder
+                .build();
+
+        if (username != null) {
+            return GraphDatabase.driver(uri, AuthTokens.basic(username, password), config);
+        } else if (bearerToken != null) {
+            return GraphDatabase.driver(uri, AuthTokens.bearer(bearerToken), config);
+        } else if (kerberosTicket != null){
+            return GraphDatabase.driver(uri, AuthTokens.kerberos(kerberosTicket), config);
+        }
+        throw new IllegalArgumentException("Invalid Field");
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jConfig.java b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jConfig.java
new file mode 100644
index 000000000..660437286
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jConfig.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.neo4j.config;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.Map;
+
+@Getter
+@Setter
+public class Neo4jConfig implements Serializable {
+
+    public static final String PLUGIN_NAME = "Neo4j";
+    public static final String KEY_NEO4J_URI = "uri";
+    public static final String KEY_USERNAME = "username";
+    public static final String KEY_PASSWORD = "password";
+    public static final String KEY_BEARER_TOKEN = "bearer_token";
+    public static final String KEY_KERBEROS_TICKET = "kerberos_ticket"; // Base64 encoded
+
+    public static final String KEY_DATABASE = "database";
+    public static final String KEY_QUERY = "query";
+    public static final String KEY_QUERY_PARAM_POSITION = "queryParamPosition";
+    public static final String KEY_MAX_TRANSACTION_RETRY_TIME = "max_transaction_retry_time";
+    public static final String KEY_MAX_CONNECTION_TIMEOUT = "max_connection_timeout";
+
+
+    private DriverBuilder driverBuilder;
+    private String query;
+    private Map<String, Object> queryParamPosition;
+
+}
diff --git a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
new file mode 100644
index 000000000..2285ef064
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.neo4j.sink;
+
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_BEARER_TOKEN;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_DATABASE;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_KERBEROS_TICKET;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_MAX_CONNECTION_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_MAX_TRANSACTION_RETRY_TIME;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_NEO4J_URI;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_QUERY;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_QUERY_PARAM_POSITION;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_USERNAME;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.PLUGIN_NAME;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.config.DriverBuilder;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import org.neo4j.driver.AuthTokens;
+
+import java.io.IOException;
+import java.net.URI;
+
+@AutoService(SeaTunnelSink.class)
+public class Neo4jSink implements SeaTunnelSink<SeaTunnelRow, Void, Void, Void> {
+
+    private SeaTunnelRowType rowType;
+    private final Neo4jConfig neo4jConfig = new Neo4jConfig();
+
+    @Override
+    public String getPluginName() {
+        return PLUGIN_NAME;
+    }
+
+    @Override
+    public void prepare(Config config) throws PrepareFailException {
+        neo4jConfig.setDriverBuilder(prepareDriver(config));
+
+        final CheckResult queryConfigCheck = CheckConfigUtil.checkAllExists(config, KEY_QUERY, KEY_QUERY_PARAM_POSITION);
+        if (!queryConfigCheck.isSuccess()) {
+            throw new PrepareFailException(PLUGIN_NAME, PluginType.SINK, queryConfigCheck.getMsg());
+        }
+        neo4jConfig.setQuery(config.getString(KEY_QUERY));
+        neo4jConfig.setQueryParamPosition(config.getObject(KEY_QUERY_PARAM_POSITION).unwrapped());
+
+    }
+
+    private DriverBuilder prepareDriver(Config config) {
+        final CheckResult uriConfigCheck = CheckConfigUtil.checkAllExists(config, KEY_NEO4J_URI, KEY_DATABASE);
+        final CheckResult authConfigCheck = CheckConfigUtil.checkAtLeastOneExists(config, KEY_USERNAME, KEY_BEARER_TOKEN, KEY_KERBEROS_TICKET);
+        final CheckResult mergedConfigCheck = CheckConfigUtil.mergeCheckResults(uriConfigCheck, authConfigCheck);
+        if (!mergedConfigCheck.isSuccess()) {
+            throw new PrepareFailException(PLUGIN_NAME, PluginType.SINK, mergedConfigCheck.getMsg());
+        }
+
+        final URI uri = URI.create(config.getString(KEY_NEO4J_URI));
+        if (!"neo4j".equals(uri.getScheme())) {
+            throw new PrepareFailException(PLUGIN_NAME, PluginType.SINK, "uri scheme is not `neo4j`");
+        }
+
+        final DriverBuilder driverBuilder = DriverBuilder.create(uri);
+
+        if (config.hasPath(KEY_USERNAME)) {
+            final CheckResult pwParamCheck = CheckConfigUtil.checkAllExists(config, KEY_PASSWORD);
+            if (!mergedConfigCheck.isSuccess()) {
+                throw new PrepareFailException(PLUGIN_NAME, PluginType.SINK, pwParamCheck.getMsg());
+            }
+            final String username = config.getString(KEY_USERNAME);
+            final String password = config.getString(KEY_PASSWORD);
+
+            driverBuilder.setUsername(username);
+            driverBuilder.setPassword(password);
+        } else if (config.hasPath(KEY_BEARER_TOKEN)) {
+            final String bearerToken = config.getString(KEY_BEARER_TOKEN);
+            AuthTokens.bearer(bearerToken);
+            driverBuilder.setBearerToken(bearerToken);
+        } else {
+            final String kerberosTicket = config.getString(KEY_KERBEROS_TICKET);
+            AuthTokens.kerberos(kerberosTicket);
+            driverBuilder.setBearerToken(kerberosTicket);
+        }
+
+        driverBuilder.setDatabase(config.getString(KEY_DATABASE));
+
+        if (config.hasPath(KEY_MAX_CONNECTION_TIMEOUT)) {
+            driverBuilder.setMaxConnectionTimeoutSeconds(config.getLong(KEY_MAX_CONNECTION_TIMEOUT));
+        }
+        if (config.hasPath(KEY_MAX_TRANSACTION_RETRY_TIME)) {
+            driverBuilder.setMaxTransactionRetryTimeSeconds(config.getLong(KEY_MAX_TRANSACTION_RETRY_TIME));
+        }
+
+        return driverBuilder;
+    }
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.rowType = seaTunnelRowType;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+        return this.rowType;
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, Void, Void> createWriter(SinkWriter.Context context) throws IOException {
+        return new Neo4jSinkWriter(neo4jConfig);
+    }
+
+}
diff --git a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java
new file mode 100644
index 000000000..74214c9eb
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.neo4j.sink;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig;
+
+import lombok.extern.slf4j.Slf4j;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Query;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class Neo4jSinkWriter implements SinkWriter<SeaTunnelRow, Void, Void> {
+
+    private final Neo4jConfig config;
+    private final transient Driver driver;
+    private final transient Session session;
+
+    public Neo4jSinkWriter(Neo4jConfig neo4jConfig) {
+        this.config = neo4jConfig;
+        this.driver = config.getDriverBuilder().build();
+        this.session = driver.session(SessionConfig.forDatabase(neo4jConfig.getDriverBuilder().getDatabase()));
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        final Map<String, Object> queryParamPosition = config.getQueryParamPosition().entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> element.getField((Integer) e.getValue())));
+        final Query query = new Query(config.getQuery(), queryParamPosition);
+        session.writeTransaction(tx -> {
+            tx.run(query);
+            return null;
+        });
+    }
+
+    @Override
+    public Optional<Void> prepareCommit() throws IOException {
+        return Optional.empty();
+    }
+
+    @Override
+    public void abortPrepare() {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+        session.close();
+        driver.close();
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-neo4j/src/main/resources/examples/fake_to_neo4j.conf b/seatunnel-connectors-v2/connector-neo4j/src/main/resources/examples/fake_to_neo4j.conf
new file mode 100644
index 000000000..3938a749b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-neo4j/src/main/resources/examples/fake_to_neo4j.conf
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  #job.mode = "STREAM"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    FakeSource {
+      result_table_name = "fake"
+      field_name = "name,age"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+
+}
+
+sink {
+  Neo4j {
+    uri = "neo4j://localhost:7687"
+    username = "neo4j"
+    password = "1234"
+    database = "neo4j"
+
+    max_transaction_retry_time = 1
+    max_connection_timeout = 1
+
+    query = "CREATE (a:Person {name: $name, age: $age})"
+    queryParamPosition = {
+        name = 0
+        age = 1
+    }
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index ed84c6dce..ae54d5232 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -49,6 +49,7 @@
         <module>connector-dingtalk</module>
         <module>connector-elasticsearch</module>
         <module>connector-iotdb</module>
+        <module>connector-neo4j</module>
     </modules>
 
     <dependencies>