You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/19 09:02:24 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector-v2] Neo4j sink connector (#2434)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 950b27d13 [Feature][Connector-v2] Neo4j sink connector (#2434)
950b27d13 is described below
commit 950b27d13244ae89b5a94201018aaca5d60256a0
Author: Namgung Chan <33...@users.noreply.github.com>
AuthorDate: Fri Aug 19 18:02:17 2022 +0900
[Feature][Connector-v2] Neo4j sink connector (#2434)
* ConnectorV2 - Neo4j Sink
---
docs/en/connector-v2/sink/Neo4j.md | 82 ++++++++++++
plugin-mapping.properties | 1 +
pom.xml | 1 +
seatunnel-connectors-v2-dist/pom.xml | 5 +
.../{ => connector-neo4j}/pom.xml | 52 +++-----
.../seatunnel/neo4j/config/DriverBuilder.java | 76 ++++++++++++
.../seatunnel/neo4j/config/Neo4jConfig.java | 48 +++++++
.../connectors/seatunnel/neo4j/sink/Neo4jSink.java | 138 +++++++++++++++++++++
.../seatunnel/neo4j/sink/Neo4jSinkWriter.java | 74 +++++++++++
.../src/main/resources/examples/fake_to_neo4j.conf | 63 ++++++++++
seatunnel-connectors-v2/pom.xml | 1 +
11 files changed, 503 insertions(+), 38 deletions(-)
diff --git a/docs/en/connector-v2/sink/Neo4j.md b/docs/en/connector-v2/sink/Neo4j.md
new file mode 100644
index 000000000..4ab8017fe
--- /dev/null
+++ b/docs/en/connector-v2/sink/Neo4j.md
@@ -0,0 +1,82 @@
+# Neo4j
+
+> Neo4j sink connector
+
+## Description
+
+Write data to Neo4j.
+
+`neo4j-java-driver` version 4.4.9
+
+## Options
+
+| name | type | required | default value |
+|----------------------------|--------|----------|---------------|
+| uri | String | Yes | - |
+| username | String | No | - |
+| password | String | No | - |
+| bearer_token | String | No | - |
+| kerberos_ticket | String | No | - |
+| database | String | Yes | - |
+| query | String | Yes | - |
+| queryParamPosition | Object | Yes | - |
+| max_transaction_retry_time | Long | No | 30 |
+| max_connection_timeout | Long | No | 30 |
+
+
+### uri [string]
+The URI of the Neo4j database. Refer to a case: `neo4j://localhost:7687`
+
+### username [string]
+username of the Neo4j
+
+### password [string]
+password of the Neo4j. required if `username` is provided
+
+### bearer_token [string]
+base64 encoded bearer token of the Neo4j. for Auth.
+
+### kerberos_ticket [string]
+base64 encoded kerberos ticket of the Neo4j. for Auth.
+
+### database [string]
+database name.
+
+### query [string]
+Query statement. contain parameter placeholders that are substituted with the corresponding values at runtime
+
+### queryParamPosition [object]
+position mapping information for query parameters.
+
+key name is parameter placeholder name.
+
+associated value is position of field in input data row.
+
+
+### max_transaction_retry_time [long]
+maximum transaction retry time(seconds). transaction fail if exceeded
+
+### max_connection_timeout [long]
+The maximum amount of time to wait for a TCP connection to be established (seconds)
+
+
+## Example
+```
+sink {
+ Neo4j {
+ uri = "neo4j://localhost:7687"
+ username = "neo4j"
+ password = "1234"
+ database = "neo4j"
+
+ max_transaction_retry_time = 10
+ max_connection_timeout = 10
+
+ query = "CREATE (a:Person {name: $name, age: $age})"
+ queryParamPosition = {
+ name = 0
+ age = 1
+ }
+ }
+}
+```
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 9d6983871..bf9e17a38 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -117,3 +117,4 @@ seatunnel.sink.DingTalk = connector-dingtalk
seatunnel.sink.elasticsearch = connector-elasticsearch
seatunnel.source.IoTDB = connector-iotdb
seatunnel.sink.IoTDB = connector-iotdb
+seatunnel.sink.Neo4j = connector-neo4j
diff --git a/pom.xml b/pom.xml
index 70d209999..4679fa7f6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -227,6 +227,7 @@
<checker.qual.version>3.10.0</checker.qual.version>
<iotdb.version>0.13.1</iotdb.version>
<awaitility.version>4.2.0</awaitility.version>
+ <neo4j-java-driver.version>4.4.9</neo4j-java-driver.version>
</properties>
<dependencyManagement>
diff --git a/seatunnel-connectors-v2-dist/pom.xml b/seatunnel-connectors-v2-dist/pom.xml
index 5cb853f27..80be3a66c 100644
--- a/seatunnel-connectors-v2-dist/pom.xml
+++ b/seatunnel-connectors-v2-dist/pom.xml
@@ -131,6 +131,11 @@
<artifactId>connector-iotdb</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-neo4j</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/connector-neo4j/pom.xml
similarity index 50%
copy from seatunnel-connectors-v2/pom.xml
copy to seatunnel-connectors-v2/connector-neo4j/pom.xml
index ed84c6dce..a4f2f481b 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/connector-neo4j/pom.xml
@@ -17,53 +17,29 @@
limitations under the License.
-->
-
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
<parent>
- <artifactId>seatunnel</artifactId>
+ <artifactId>seatunnel-connectors-v2</artifactId>
<groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>
- <modelVersion>4.0.0</modelVersion>
- <packaging>pom</packaging>
- <artifactId>seatunnel-connectors-v2</artifactId>
- <modules>
- <module>connector-common</module>
- <module>connector-clickhouse</module>
- <module>connector-console</module>
- <module>connector-fake</module>
- <module>connector-http</module>
- <module>connector-jdbc</module>
- <module>connector-kafka</module>
- <module>connector-pulsar</module>
- <module>connector-socket</module>
- <module>connector-hive</module>
- <module>connector-file</module>
- <module>connector-hudi</module>
- <module>connector-assert</module>
- <module>connector-kudu</module>
- <module>connector-email</module>
- <module>connector-dingtalk</module>
- <module>connector-elasticsearch</module>
- <module>connector-iotdb</module>
- </modules>
+ <artifactId>connector-neo4j</artifactId>
<dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.neo4j.driver</groupId>
+ <artifactId>neo4j-java-driver</artifactId>
+ <version>${neo4j-java-driver.version}</version>
+ </dependency>
</dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>
+</project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/DriverBuilder.java b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/DriverBuilder.java
new file mode 100644
index 000000000..ac8914e46
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/DriverBuilder.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.neo4j.config;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+@Getter
+@Setter
+public class DriverBuilder implements Serializable {
+ private final URI uri;
+ private String username;
+ private String password;
+ private String bearerToken;
+ private String kerberosTicket;
+ private String database;
+
+ private Long maxTransactionRetryTimeSeconds;
+ private Long maxConnectionTimeoutSeconds;
+
+ public static DriverBuilder create(URI uri) {
+ return new DriverBuilder(uri);
+ }
+
+ private DriverBuilder(URI uri) {
+ this.uri = uri;
+ }
+
+ public Driver build() {
+ final Config.ConfigBuilder configBuilder = Config.builder()
+ .withMaxConnectionPoolSize(1);
+ if (maxConnectionTimeoutSeconds != null) {
+ configBuilder
+ .withConnectionAcquisitionTimeout(maxConnectionTimeoutSeconds * 2, TimeUnit.SECONDS)
+ .withConnectionTimeout(maxConnectionTimeoutSeconds, TimeUnit.SECONDS);
+ }
+ if (maxTransactionRetryTimeSeconds != null) {
+ configBuilder
+ .withMaxTransactionRetryTime(maxTransactionRetryTimeSeconds, TimeUnit.SECONDS);
+ }
+ Config config = configBuilder
+ .build();
+
+ if (username != null) {
+ return GraphDatabase.driver(uri, AuthTokens.basic(username, password), config);
+ } else if (bearerToken != null) {
+ return GraphDatabase.driver(uri, AuthTokens.bearer(bearerToken), config);
+ } else if (kerberosTicket != null){
+ return GraphDatabase.driver(uri, AuthTokens.kerberos(kerberosTicket), config);
+ }
+ throw new IllegalArgumentException("Invalid Field");
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jConfig.java b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jConfig.java
new file mode 100644
index 000000000..660437286
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jConfig.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.neo4j.config;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.Map;
+
+@Getter
+@Setter
+public class Neo4jConfig implements Serializable {
+
+ public static final String PLUGIN_NAME = "Neo4j";
+ public static final String KEY_NEO4J_URI = "uri";
+ public static final String KEY_USERNAME = "username";
+ public static final String KEY_PASSWORD = "password";
+ public static final String KEY_BEARER_TOKEN = "bearer_token";
+ public static final String KEY_KERBEROS_TICKET = "kerberos_ticket"; // Base64 encoded
+
+ public static final String KEY_DATABASE = "database";
+ public static final String KEY_QUERY = "query";
+ public static final String KEY_QUERY_PARAM_POSITION = "queryParamPosition";
+ public static final String KEY_MAX_TRANSACTION_RETRY_TIME = "max_transaction_retry_time";
+ public static final String KEY_MAX_CONNECTION_TIMEOUT = "max_connection_timeout";
+
+
+ private DriverBuilder driverBuilder;
+ private String query;
+ private Map<String, Object> queryParamPosition;
+
+}
diff --git a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
new file mode 100644
index 000000000..2285ef064
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.neo4j.sink;
+
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_BEARER_TOKEN;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_DATABASE;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_KERBEROS_TICKET;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_MAX_CONNECTION_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_MAX_TRANSACTION_RETRY_TIME;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_NEO4J_URI;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_QUERY;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_QUERY_PARAM_POSITION;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.KEY_USERNAME;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig.PLUGIN_NAME;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.config.DriverBuilder;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import org.neo4j.driver.AuthTokens;
+
+import java.io.IOException;
+import java.net.URI;
+
+@AutoService(SeaTunnelSink.class)
+public class Neo4jSink implements SeaTunnelSink<SeaTunnelRow, Void, Void, Void> {
+
+ private SeaTunnelRowType rowType;
+ private final Neo4jConfig neo4jConfig = new Neo4jConfig();
+
+ @Override
+ public String getPluginName() {
+ return PLUGIN_NAME;
+ }
+
+ @Override
+ public void prepare(Config config) throws PrepareFailException {
+ neo4jConfig.setDriverBuilder(prepareDriver(config));
+
+ final CheckResult queryConfigCheck = CheckConfigUtil.checkAllExists(config, KEY_QUERY, KEY_QUERY_PARAM_POSITION);
+ if (!queryConfigCheck.isSuccess()) {
+ throw new PrepareFailException(PLUGIN_NAME, PluginType.SINK, queryConfigCheck.getMsg());
+ }
+ neo4jConfig.setQuery(config.getString(KEY_QUERY));
+ neo4jConfig.setQueryParamPosition(config.getObject(KEY_QUERY_PARAM_POSITION).unwrapped());
+
+ }
+
+ private DriverBuilder prepareDriver(Config config) {
+ final CheckResult uriConfigCheck = CheckConfigUtil.checkAllExists(config, KEY_NEO4J_URI, KEY_DATABASE);
+ final CheckResult authConfigCheck = CheckConfigUtil.checkAtLeastOneExists(config, KEY_USERNAME, KEY_BEARER_TOKEN, KEY_KERBEROS_TICKET);
+ final CheckResult mergedConfigCheck = CheckConfigUtil.mergeCheckResults(uriConfigCheck, authConfigCheck);
+ if (!mergedConfigCheck.isSuccess()) {
+ throw new PrepareFailException(PLUGIN_NAME, PluginType.SINK, mergedConfigCheck.getMsg());
+ }
+
+ final URI uri = URI.create(config.getString(KEY_NEO4J_URI));
+ if (!"neo4j".equals(uri.getScheme())) {
+ throw new PrepareFailException(PLUGIN_NAME, PluginType.SINK, "uri scheme is not `neo4j`");
+ }
+
+ final DriverBuilder driverBuilder = DriverBuilder.create(uri);
+
+ if (config.hasPath(KEY_USERNAME)) {
+ final CheckResult pwParamCheck = CheckConfigUtil.checkAllExists(config, KEY_PASSWORD);
+ if (!mergedConfigCheck.isSuccess()) {
+ throw new PrepareFailException(PLUGIN_NAME, PluginType.SINK, pwParamCheck.getMsg());
+ }
+ final String username = config.getString(KEY_USERNAME);
+ final String password = config.getString(KEY_PASSWORD);
+
+ driverBuilder.setUsername(username);
+ driverBuilder.setPassword(password);
+ } else if (config.hasPath(KEY_BEARER_TOKEN)) {
+ final String bearerToken = config.getString(KEY_BEARER_TOKEN);
+ AuthTokens.bearer(bearerToken);
+ driverBuilder.setBearerToken(bearerToken);
+ } else {
+ final String kerberosTicket = config.getString(KEY_KERBEROS_TICKET);
+ AuthTokens.kerberos(kerberosTicket);
+ driverBuilder.setBearerToken(kerberosTicket);
+ }
+
+ driverBuilder.setDatabase(config.getString(KEY_DATABASE));
+
+ if (config.hasPath(KEY_MAX_CONNECTION_TIMEOUT)) {
+ driverBuilder.setMaxConnectionTimeoutSeconds(config.getLong(KEY_MAX_CONNECTION_TIMEOUT));
+ }
+ if (config.hasPath(KEY_MAX_TRANSACTION_RETRY_TIME)) {
+ driverBuilder.setMaxTransactionRetryTimeSeconds(config.getLong(KEY_MAX_TRANSACTION_RETRY_TIME));
+ }
+
+ return driverBuilder;
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.rowType = seaTunnelRowType;
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+ return this.rowType;
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, Void, Void> createWriter(SinkWriter.Context context) throws IOException {
+ return new Neo4jSinkWriter(neo4jConfig);
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java
new file mode 100644
index 000000000..74214c9eb
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSinkWriter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.neo4j.sink;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jConfig;
+
+import lombok.extern.slf4j.Slf4j;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Query;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class Neo4jSinkWriter implements SinkWriter<SeaTunnelRow, Void, Void> {
+
+ private final Neo4jConfig config;
+ private final transient Driver driver;
+ private final transient Session session;
+
+ public Neo4jSinkWriter(Neo4jConfig neo4jConfig) {
+ this.config = neo4jConfig;
+ this.driver = config.getDriverBuilder().build();
+ this.session = driver.session(SessionConfig.forDatabase(neo4jConfig.getDriverBuilder().getDatabase()));
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ final Map<String, Object> queryParamPosition = config.getQueryParamPosition().entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> element.getField((Integer) e.getValue())));
+ final Query query = new Query(config.getQuery(), queryParamPosition);
+ session.writeTransaction(tx -> {
+ tx.run(query);
+ return null;
+ });
+ }
+
+ @Override
+ public Optional<Void> prepareCommit() throws IOException {
+ return Optional.empty();
+ }
+
+ @Override
+ public void abortPrepare() {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ session.close();
+ driver.close();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-neo4j/src/main/resources/examples/fake_to_neo4j.conf b/seatunnel-connectors-v2/connector-neo4j/src/main/resources/examples/fake_to_neo4j.conf
new file mode 100644
index 000000000..3938a749b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-neo4j/src/main/resources/examples/fake_to_neo4j.conf
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ #job.mode = "STREAM"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+
+}
+
+sink {
+ Neo4j {
+ uri = "neo4j://localhost:7687"
+ username = "neo4j"
+ password = "1234"
+ database = "neo4j"
+
+ max_transaction_retry_time = 1
+ max_connection_timeout = 1
+
+ query = "CREATE (a:Person {name: $name, age: $age})"
+ queryParamPosition = {
+ name = 0
+ age = 1
+ }
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index ed84c6dce..ae54d5232 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -49,6 +49,7 @@
<module>connector-dingtalk</module>
<module>connector-elasticsearch</module>
<module>connector-iotdb</module>
+ <module>connector-neo4j</module>
</modules>
<dependencies>