You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/09/18 11:31:30 UTC

[GitHub] [incubator-seatunnel] getChan opened a new pull request, #2777: [Feature][Connector-v2] Neo4j source connector

getChan opened a new pull request, #2777:
URL: https://github.com/apache/incubator-seatunnel/pull/2777

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   To add Neo4j Source Connector
   
   ## Check list
   
   * [X] Code changed are covered with tests, or it does not need tests for reason:
   * [X] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [X] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #2777: [Feature][Connector-v2] Neo4j source connector

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #2777:
URL: https://github.com/apache/incubator-seatunnel/pull/2777#discussion_r995579861


##########
docs/en/connector-v2/source/Neo4j.md:
##########
@@ -0,0 +1,100 @@
+# Neo4j
+
+> Neo4j source connector
+
+## Description
+
+Read data from Neo4j.
+
+`neo4j-java-driver` version 4.4.9
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name                       | type   | required | default value |
+|----------------------------|--------|----------|---------------|
+| uri                        | String | Yes      | -             |
+| username                   | String | No       | -             |
+| password                   | String | No       | -             |
+| bearer_token               | String | No       | -             |
+| kerberos_ticket            | String | No       | -             |
+| database                   | String | Yes      | -             |
+| query                      | String | Yes      | -             |
+| schema.fields              | Object | Yes      | -             |
+| max_transaction_retry_time | Long   | No       | 30            |
+| max_connection_timeout     | Long   | No       | 30            |
+
+### uri [string]
+
+The URI of the Neo4j database. Refer to a case: `neo4j://localhost:7687`
+
+### username [string]
+
+username of the Neo4j
+
+### password [string]
+
+password of the Neo4j. required if `username` is provided
+
+### bearer_token [string]
+
+base64 encoded bearer token of the Neo4j. for Auth.
+
+### kerberos_ticket [string]
+
+base64 encoded kerberos ticket of the Neo4j. for Auth.
+
+### database [string]
+
+database name.
+
+### query [string]
+
+Query statement.
+
+### schema.fields [string]
+
+returned fields of `query`
+
+see [schema projection](../../concept/connector-v2-features.md)
+
+### max_transaction_retry_time [long]
+
+maximum transaction retry time(seconds). transaction fail if exceeded
+
+### max_connection_timeout [long]
+
+The maximum amount of time to wait for a TCP connection to be established (seconds)
+
+## Example
+
+```
+source {
+    Neo4j {
+        uri = "neo4j://localhost:7687"
+        username = "neo4j"
+        password = "1234"
+        database = "neo4j"
+    
+        max_transaction_retry_time = 1
+        max_connection_timeout = 1
+    
+        query = "MATCH (a:Person) RETURN a.name, a.age"
+    
+        schema {
+            fields {
+                a.age=INT
+                a.name=STRING

Review Comment:
   @hailin0  PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs commented on pull request #2777: [Feature][Connector-v2] Neo4j source connector

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on PR #2777:
URL: https://github.com/apache/incubator-seatunnel/pull/2777#issuecomment-1286520037

   Thanks:)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on pull request #2777: [Feature][Connector-v2] Neo4j source connector

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on PR #2777:
URL: https://github.com/apache/incubator-seatunnel/pull/2777#issuecomment-1277066426

   > @EricJoy2048 @hailin0 I applied your reviews. except for `TIME` datatypes at spark. It does not supported at spark
   
   Yes I know.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2777: [Feature][Connector-v2] Neo4j source connector

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2777:
URL: https://github.com/apache/incubator-seatunnel/pull/2777#discussion_r973879427


##########
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/Neo4jSourceConfig.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.neo4j.config;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+
+@Getter
+@Setter
+public class Neo4jSourceConfig implements Serializable {
+
+    public static final String PLUGIN_NAME = "Neo4j";
+
+    public static final String KEY_NEO4J_URI = "uri";
+    public static final String KEY_USERNAME = "username";
+    public static final String KEY_PASSWORD = "password";
+    public static final String KEY_BEARER_TOKEN = "bearer_token";
+    public static final String KEY_KERBEROS_TICKET = "kerberos_ticket"; // Base64 encoded
+
+    public static final String KEY_DATABASE = "database";
+    public static final String KEY_QUERY = "query";
+    public static final String KEY_MAX_TRANSACTION_RETRY_TIME = "max_transaction_retry_time";
+    public static final String KEY_MAX_CONNECTION_TIMEOUT = "max_connection_timeout";

Review Comment:
   Extract common configuration to `Neo4jCommonConfig`?



##########
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSourceReader.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.neo4j.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig;
+
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Query;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.Value;
+
+import java.io.IOException;
+import java.util.Objects;
+
+public class Neo4jSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
+
+    private final SingleSplitReaderContext context;
+    private final Neo4jSourceConfig config;
+    private final SeaTunnelRowType rowType;
+    private final Driver driver;
+    private Session session;
+
+    public Neo4jSourceReader(SingleSplitReaderContext context, Neo4jSourceConfig config, SeaTunnelRowType rowType) {
+        this.context = context;
+        this.config = config;
+        this.driver = config.getDriverBuilder().build();
+        this.rowType = rowType;
+    }
+
+    @Override
+    public void open() throws Exception {
+        this.session = driver.session(SessionConfig.forDatabase(config.getDriverBuilder().getDatabase()));
+    }
+
+    @Override
+    public void close() throws IOException {
+        session.close();
+        driver.close();
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        final Query query = new Query(config.getQuery());
+        session.readTransaction(tx -> {
+            final Result result = tx.run(query);
+            result.stream()
+                .forEach(row -> {
+                    final Object[] fields = new Object[rowType.getTotalFields()];
+                    for (int i = 0; i < rowType.getTotalFields(); i++) {
+                        final String fieldName = rowType.getFieldName(i);
+                        final SeaTunnelDataType<?> fieldType = rowType.getFieldType(i);
+                        final Value value = row.get(fieldName);
+                        fields[i] = convertType(fieldType, value);
+                    }
+                    output.collect(new SeaTunnelRow(fields));
+                });
+            return null;
+        });
+        this.context.signalNoMoreElement();
+    }
+
+    public static Object convertType(SeaTunnelDataType<?> dataType, Value value) {
+        Objects.requireNonNull(dataType);
+        Objects.requireNonNull(value);
+
+        if (dataType.equals(BasicType.STRING_TYPE)) {
+            return value.asString();
+        } else if (dataType.equals(BasicType.BOOLEAN_TYPE)) {
+            return value.asBoolean();
+        } else if (dataType.equals(BasicType.LONG_TYPE)) {
+            return value.asLong();
+        } else if (dataType.equals(BasicType.DOUBLE_TYPE)) {
+            return value.asDouble();
+        } else if (dataType.equals(BasicType.VOID_TYPE)) {
+            return null;
+        } else if (dataType.equals(PrimitiveByteArrayType.INSTANCE)) {
+            return value.asByteArray();
+        } else if (dataType.equals(LocalTimeType.LOCAL_DATE_TYPE)) {
+            return value.asLocalDate();
+        } else if (dataType.equals(LocalTimeType.LOCAL_TIME_TYPE)) {
+            return value.asLocalTime();
+        } else if (dataType.equals(LocalTimeType.LOCAL_DATE_TIME_TYPE)) {
+            return value.asLocalDateTime();
+        } else if (dataType instanceof MapType) {
+            if (!((MapType<?, ?>) dataType).getKeyType().equals(BasicType.STRING_TYPE)) {
+                throw new IllegalArgumentException("Key Type of MapType must String type");
+            }
+            return value.asMap();
+        } else if (dataType.equals(BasicType.INT_TYPE)) {
+            return value.asInt();
+        } else if (dataType.equals(BasicType.FLOAT_TYPE)) {
+            return value.asFloat();
+        } else {
+            throw new IllegalArgumentException("not supported data type: " + dataType);
+        }

Review Comment:
   Use `switch(dataType.getSqlType())` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2777: [Feature][Connector-v2] Neo4j source connector

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2777:
URL: https://github.com/apache/incubator-seatunnel/pull/2777#discussion_r994208584


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-neo4j-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/neo4j/Neo4jIT.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.neo4j;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.neo4j.driver.Values.parameters;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.Value;
+import org.neo4j.driver.types.Node;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.net.URI;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class Neo4jIT extends FlinkContainer {
+
+    private static final String CONTAINER_IMAGE = "neo4j:latest";
+    private static final String CONTAINER_HOST = "neo4j-host";
+    private static final int CONTAINER_PORT = 7687;
+    private static final String CONTAINER_NEO4J_USERNAME = "neo4j";
+    private static final String CONTAINER_NEO4J_PASSWORD = "1234";
+    private static final URI CONTAINER_URI = URI.create("neo4j://localhost:" + CONTAINER_PORT);
+
+    private GenericContainer<?> container;
+    private Driver neo4jDriver;
+    private Session neo4jSession;
+
+    @BeforeAll
+    public void init() {
+        DockerImageName imageName = DockerImageName.parse(CONTAINER_IMAGE);
+        container = new GenericContainer<>(imageName)
+            .withNetwork(NETWORK)
+            .withNetworkAliases(CONTAINER_HOST)
+            .withExposedPorts(CONTAINER_PORT)
+            .withEnv("NEO4J_AUTH", CONTAINER_NEO4J_USERNAME + "/" + CONTAINER_NEO4J_PASSWORD)
+            .withLogConsumer(new Slf4jLogConsumer(log));

Review Comment:
   ```suggestion
               .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(CONTAINER_IMAGE)));
   ```
   
   https://github.com/apache/incubator-seatunnel/pull/3028
   



##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-neo4j-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/neo4j/Neo4jIT.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.neo4j;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.neo4j.driver.Values.parameters;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.Value;
+import org.neo4j.driver.types.Node;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.net.URI;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class Neo4jIT extends SparkContainer {
+
+    private static final String CONTAINER_IMAGE = "neo4j:latest";
+    private static final String CONTAINER_HOST = "neo4j-host";
+    private static final int CONTAINER_PORT = 7687;
+    private static final String CONTAINER_NEO4J_USERNAME = "neo4j";
+    private static final String CONTAINER_NEO4J_PASSWORD = "1234";
+    private static final URI CONTAINER_URI = URI.create("neo4j://localhost:" + CONTAINER_PORT);
+
+    private GenericContainer<?> container;
+    private Driver neo4jDriver;
+    private Session neo4jSession;
+
+    @BeforeAll
+    public void init() {
+        DockerImageName imageName = DockerImageName.parse(CONTAINER_IMAGE);
+        container = new GenericContainer<>(imageName)
+            .withExposedPorts(CONTAINER_PORT)
+            .withNetwork(NETWORK)
+            .withNetworkAliases(CONTAINER_HOST)
+            .withEnv("NEO4J_AUTH", CONTAINER_NEO4J_USERNAME + "/" + CONTAINER_NEO4J_PASSWORD)
+            .withLogConsumer(new Slf4jLogConsumer(log));

Review Comment:
   ```suggestion
               .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(CONTAINER_IMAGE)));
   ```
   
   https://github.com/apache/incubator-seatunnel/pull/3028



##########
docs/en/connector-v2/source/Neo4j.md:
##########
@@ -0,0 +1,100 @@
+# Neo4j
+
+> Neo4j source connector
+
+## Description
+
+Read data from Neo4j.
+
+`neo4j-java-driver` version 4.4.9
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name                       | type   | required | default value |
+|----------------------------|--------|----------|---------------|
+| uri                        | String | Yes      | -             |
+| username                   | String | No       | -             |
+| password                   | String | No       | -             |
+| bearer_token               | String | No       | -             |
+| kerberos_ticket            | String | No       | -             |
+| database                   | String | Yes      | -             |
+| query                      | String | Yes      | -             |
+| schema.fields              | Object | Yes      | -             |
+| max_transaction_retry_time | Long   | No       | 30            |
+| max_connection_timeout     | Long   | No       | 30            |
+
+### uri [string]
+
+The URI of the Neo4j database. Refer to a case: `neo4j://localhost:7687`
+
+### username [string]
+
+username of the Neo4j
+
+### password [string]
+
+password of the Neo4j. required if `username` is provided
+
+### bearer_token [string]
+
+base64 encoded bearer token of the Neo4j. for Auth.
+
+### kerberos_ticket [string]
+
+base64 encoded kerberos ticket of the Neo4j. for Auth.
+
+### database [string]
+
+database name.
+
+### query [string]
+
+Query statement.
+
+### schema.fields [string]
+
+returned fields of `query`
+
+see [schema projection](../../concept/connector-v2-features.md)
+
+### max_transaction_retry_time [long]
+
+maximum transaction retry time(seconds). transaction fail if exceeded
+
+### max_connection_timeout [long]
+
+The maximum amount of time to wait for a TCP connection to be established (seconds)
+
+## Example
+
+```
+source {
+    Neo4j {
+        uri = "neo4j://localhost:7687"
+        username = "neo4j"
+        password = "1234"
+        database = "neo4j"
+    
+        max_transaction_retry_time = 1
+        max_connection_timeout = 1
+    
+        query = "MATCH (a:Person) RETURN a.name, a.age"
+    
+        schema {
+            fields {
+                a.age=INT
+                a.name=STRING

Review Comment:
   Better not to include  `.` ?
   
   e.g.
   
   xxx_age = INT
   xxx_name = STRING



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2777: [Feature][Connector-v2] Neo4j source connector

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2777:
URL: https://github.com/apache/incubator-seatunnel/pull/2777#discussion_r979142460


##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-neo4j-spark-e2e/src/test/resources/neo4j/neo4j_to_assert.conf:
##########
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+    Neo4j {
+        uri = "neo4j://neo4j-host:7687"
+        username = "neo4j"
+        password = "1234"
+        database = "neo4j"
+
+        max_transaction_retry_time = 1
+        max_connection_timeout = 1
+
+        query = "MATCH (a:Person) RETURN a.name, a.age"
+
+        schema {
+            fields {

Review Comment:
   test all datatypes?
   
   reference
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf#L36



##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-neo4j-flink-e2e/src/test/resources/neo4j/neo4j_to_assert.conf:
##########
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  #job.mode = "STREAM"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+    Neo4j {
+        uri = "neo4j://neo4j-host:7687"
+        username = "neo4j"
+        password = "1234"
+        database = "neo4j"
+
+        max_transaction_retry_time = 1
+        max_connection_timeout = 1
+
+        query = "MATCH (a:Person) RETURN a.name, a.age"
+
+        schema {
+            fields {

Review Comment:
   test all datatypes?
   
   reference
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf#L36



##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-neo4j-spark-e2e/src/test/resources/neo4j/fake_to_neo4j.conf:
##########
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    FakeSource {
+      result_table_name = "fake"
+      schema = {
+        fields = {

Review Comment:
   test all datatypes?
   
   reference
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf#L36



##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-neo4j-flink-e2e/src/test/resources/neo4j/fake_to_neo4j.conf:
##########
@@ -30,7 +30,12 @@ source {
   # This is a example source plugin **only for test and demonstrate the feature source plugin**
     FakeSource {
       result_table_name = "fake"
-      field_name = "name,age"
+      schema = {
+        fields = {
+            name=STRING
+            age=INT

Review Comment:
   test all datatypes?
   
   reference
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf#L36



##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-neo4j-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/neo4j/Neo4jIT.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.neo4j;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class Neo4jIT extends FlinkContainer {
+
+    private static final String CONTAINER_IMAGE = "neo4j:latest";
+    private static final String CONTAINER_HOST = "neo4j-host";
+    private static final int CONTAINER_PORT = 7687;
+    private static final String CONTAINER_NEO4J_USERNAME = "neo4j";
+    private static final String CONTAINER_NEO4J_PASSWORD = "1234";
+    private static final URI CONTAINER_URI = URI.create("neo4j://localhost:" + CONTAINER_PORT);
+
+    private GenericContainer<?> container;
+    private Driver neo4jDriver;
+    private Session neo4jSession;
+
+    @BeforeAll
+    public void init() {
+        DockerImageName imageName = DockerImageName.parse(CONTAINER_IMAGE);
+        container = new GenericContainer<>(imageName)
+            .withNetwork(NETWORK)
+            .withNetworkAliases(CONTAINER_HOST)
+            .withExposedPorts(CONTAINER_PORT)
+            .withEnv("NEO4J_AUTH", CONTAINER_NEO4J_USERNAME + "/" + CONTAINER_NEO4J_PASSWORD)
+            .withLogConsumer(new Slf4jLogConsumer(log));
+        container.setPortBindings(Lists.newArrayList(String.format("%s:%s", CONTAINER_PORT, CONTAINER_PORT)));
+        Startables.deepStart(Stream.of(container)).join();
+        log.info("container started");
+        Awaitility.given().ignoreExceptions()
+            .await()
+            .atMost(30, TimeUnit.SECONDS)
+            .untilAsserted(this::initConnection);
+
+    }
+
+    private void initConnection() {
+        neo4jDriver = GraphDatabase.driver(CONTAINER_URI, AuthTokens.basic(CONTAINER_NEO4J_USERNAME, CONTAINER_NEO4J_PASSWORD));
+        neo4jSession = neo4jDriver.session(SessionConfig.forDatabase("neo4j"));
+    }
+
+    @Test
+    public void testSink() throws IOException, InterruptedException {

Review Comment:
   Can you test source and sink at the once job?
   
   reference
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java#L106



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on pull request #2777: [Feature][Connector-v2] Neo4j source connector

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on PR #2777:
URL: https://github.com/apache/incubator-seatunnel/pull/2777#issuecomment-1279719446

   Hi, @getChan  Thanks for your contribution, Please resolve conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #2777: [Feature][Connector-v2] Neo4j source connector

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #2777:
URL: https://github.com/apache/incubator-seatunnel/pull/2777#discussion_r979770162


##########
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.neo4j.source;
+
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_BEARER_TOKEN;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_DATABASE;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_KERBEROS_TICKET;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_MAX_CONNECTION_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_MAX_TRANSACTION_RETRY_TIME;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_NEO4J_URI;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_PASSWORD;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_QUERY;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.KEY_USERNAME;
+import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.PLUGIN_NAME;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.config.DriverBuilder;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import org.neo4j.driver.AuthTokens;
+
+import java.net.URI;
+
+@AutoService(SeaTunnelSource.class)
+public class Neo4jSource extends AbstractSingleSplitSource<SeaTunnelRow> {
+
+    private final Neo4jSourceConfig neo4jSourceConfig = new Neo4jSourceConfig();
+    private SeaTunnelRowType rowType;
+
+    @Override
+    public String getPluginName() {
+        return PLUGIN_NAME;
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        neo4jSourceConfig.setDriverBuilder(prepareDriver(pluginConfig));
+
+        final CheckResult queryConfigCheck = CheckConfigUtil.checkAllExists(pluginConfig, KEY_QUERY);
+        if (!queryConfigCheck.isSuccess()) {
+            throw new PrepareFailException(Neo4jSourceConfig.PLUGIN_NAME, PluginType.SOURCE, queryConfigCheck.getMsg());
+        }
+        neo4jSourceConfig.setQuery(pluginConfig.getString(KEY_QUERY));
+
+        final CheckResult schemaConfigCheck = CheckConfigUtil.checkAllExists(pluginConfig, SeaTunnelSchema.SCHEMA);

Review Comment:
   Why not use `CheckConfigUtil.checkAllExists(pluginConfig, KEY_QUERY, SeaTunnelSchema.SCHEMA)`



##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-neo4j-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/neo4j/Neo4jIT.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.neo4j;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class Neo4jIT extends FlinkContainer {
+
+    private static final String CONTAINER_IMAGE = "neo4j:latest";
+    private static final String CONTAINER_HOST = "neo4j-host";
+    private static final int CONTAINER_PORT = 7687;
+    private static final String CONTAINER_NEO4J_USERNAME = "neo4j";
+    private static final String CONTAINER_NEO4J_PASSWORD = "1234";
+    private static final URI CONTAINER_URI = URI.create("neo4j://localhost:" + CONTAINER_PORT);
+
+    private GenericContainer<?> container;
+    private Driver neo4jDriver;
+    private Session neo4jSession;
+
+    @BeforeAll
+    public void init() {
+        DockerImageName imageName = DockerImageName.parse(CONTAINER_IMAGE);
+        container = new GenericContainer<>(imageName)
+            .withNetwork(NETWORK)
+            .withNetworkAliases(CONTAINER_HOST)
+            .withExposedPorts(CONTAINER_PORT)
+            .withEnv("NEO4J_AUTH", CONTAINER_NEO4J_USERNAME + "/" + CONTAINER_NEO4J_PASSWORD)
+            .withLogConsumer(new Slf4jLogConsumer(log));
+        container.setPortBindings(Lists.newArrayList(String.format("%s:%s", CONTAINER_PORT, CONTAINER_PORT)));
+        Startables.deepStart(Stream.of(container)).join();
+        log.info("container started");
+        Awaitility.given().ignoreExceptions()
+            .await()
+            .atMost(30, TimeUnit.SECONDS)
+            .untilAsserted(this::initConnection);
+
+    }
+
+    private void initConnection() {
+        neo4jDriver = GraphDatabase.driver(CONTAINER_URI, AuthTokens.basic(CONTAINER_NEO4J_USERNAME, CONTAINER_NEO4J_PASSWORD));
+        neo4jSession = neo4jDriver.session(SessionConfig.forDatabase("neo4j"));
+    }
+
+    @Test
+    public void testSink() throws IOException, InterruptedException {
+        // when
+        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/neo4j/fake_to_neo4j.conf");
+
+        // then
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        final Result result = neo4jSession.run("MATCH (a:Person) RETURN a.name, a.age");

Review Comment:
   Please test all of the datatype.



##########
seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSourceReader.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.neo4j.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig;
+
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Query;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.Value;
+
+import java.io.IOException;
+import java.util.Objects;
+
+public class Neo4jSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
+
+    private final SingleSplitReaderContext context;
+    private final Neo4jSourceConfig config;
+    private final SeaTunnelRowType rowType;
+    private final Driver driver;
+    private Session session;
+
+    public Neo4jSourceReader(SingleSplitReaderContext context, Neo4jSourceConfig config, SeaTunnelRowType rowType) {
+        this.context = context;
+        this.config = config;
+        this.driver = config.getDriverBuilder().build();
+        this.rowType = rowType;
+    }
+
+    @Override
+    public void open() throws Exception {
+        this.session = driver.session(SessionConfig.forDatabase(config.getDriverBuilder().getDatabase()));
+    }
+
+    @Override
+    public void close() throws IOException {
+        session.close();
+        driver.close();
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        final Query query = new Query(config.getQuery());
+        session.readTransaction(tx -> {
+            final Result result = tx.run(query);
+            result.stream()
+                .forEach(row -> {
+                    final Object[] fields = new Object[rowType.getTotalFields()];
+                    for (int i = 0; i < rowType.getTotalFields(); i++) {
+                        final String fieldName = rowType.getFieldName(i);
+                        final SeaTunnelDataType<?> fieldType = rowType.getFieldType(i);
+                        final Value value = row.get(fieldName);
+                        fields[i] = convertType(fieldType, value);
+                    }
+                    output.collect(new SeaTunnelRow(fields));
+                });
+            return null;
+        });
+        this.context.signalNoMoreElement();
+    }
+
+    public static Object convertType(SeaTunnelDataType<?> dataType, Value value) {
+        Objects.requireNonNull(dataType);
+        Objects.requireNonNull(value);
+
+        switch (dataType.getSqlType()) {
+            case STRING:

Review Comment:
   Can you case `Neo4j.List` to `SqlType.ARRAY`?



##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-neo4j-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/neo4j/Neo4jIT.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.neo4j;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class Neo4jIT extends SparkContainer {
+
+    private static final String CONTAINER_IMAGE = "neo4j:latest";
+    private static final String CONTAINER_HOST = "neo4j-host";
+    private static final int CONTAINER_PORT = 7687;
+    private static final String CONTAINER_NEO4J_USERNAME = "neo4j";
+    private static final String CONTAINER_NEO4J_PASSWORD = "1234";
+    private static final URI CONTAINER_URI = URI.create("neo4j://localhost:" + CONTAINER_PORT);
+
+    private GenericContainer<?> container;
+    private Driver neo4jDriver;
+    private Session neo4jSession;
+
+    @BeforeAll
+    public void init() {
+        DockerImageName imageName = DockerImageName.parse(CONTAINER_IMAGE);
+        container = new GenericContainer<>(imageName)
+            .withExposedPorts(CONTAINER_PORT)
+            .withNetwork(NETWORK)
+            .withNetworkAliases(CONTAINER_HOST)
+            .withEnv("NEO4J_AUTH", CONTAINER_NEO4J_USERNAME + "/" + CONTAINER_NEO4J_PASSWORD)
+            .withLogConsumer(new Slf4jLogConsumer(log));
+        container.setPortBindings(Lists.newArrayList(String.format("%s:%s", CONTAINER_PORT, CONTAINER_PORT)));
+        Startables.deepStart(Stream.of(container)).join();
+        log.info("container started");
+        Awaitility.given().ignoreExceptions()
+            .await()
+            .atMost(30, TimeUnit.SECONDS)
+            .untilAsserted(this::initConnection);
+
+    }
+
+    private void initConnection() {
+        neo4jDriver = GraphDatabase.driver(CONTAINER_URI, AuthTokens.basic(CONTAINER_NEO4J_USERNAME, CONTAINER_NEO4J_PASSWORD));
+        neo4jSession = neo4jDriver.session(SessionConfig.forDatabase("neo4j"));
+    }
+
+    @Test
+    public void testSink() throws IOException, InterruptedException {
+        // when
+        Container.ExecResult execResult = executeSeaTunnelSparkJob("/neo4j/fake_to_neo4j.conf");
+
+        // then
+        Assertions.assertEquals(0, execResult.getExitCode());

Review Comment:
   Same as the flink e2e.



##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-neo4j-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/neo4j/Neo4jIT.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.neo4j;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class Neo4jIT extends FlinkContainer {
+
+    private static final String CONTAINER_IMAGE = "neo4j:latest";
+    private static final String CONTAINER_HOST = "neo4j-host";
+    private static final int CONTAINER_PORT = 7687;
+    private static final String CONTAINER_NEO4J_USERNAME = "neo4j";
+    private static final String CONTAINER_NEO4J_PASSWORD = "1234";
+    private static final URI CONTAINER_URI = URI.create("neo4j://localhost:" + CONTAINER_PORT);
+
+    private GenericContainer<?> container;
+    private Driver neo4jDriver;
+    private Session neo4jSession;
+
+    @BeforeAll
+    public void init() {
+        DockerImageName imageName = DockerImageName.parse(CONTAINER_IMAGE);
+        container = new GenericContainer<>(imageName)
+            .withNetwork(NETWORK)
+            .withNetworkAliases(CONTAINER_HOST)
+            .withExposedPorts(CONTAINER_PORT)
+            .withEnv("NEO4J_AUTH", CONTAINER_NEO4J_USERNAME + "/" + CONTAINER_NEO4J_PASSWORD)
+            .withLogConsumer(new Slf4jLogConsumer(log));
+        container.setPortBindings(Lists.newArrayList(String.format("%s:%s", CONTAINER_PORT, CONTAINER_PORT)));
+        Startables.deepStart(Stream.of(container)).join();
+        log.info("container started");
+        Awaitility.given().ignoreExceptions()
+            .await()
+            .atMost(30, TimeUnit.SECONDS)
+            .untilAsserted(this::initConnection);
+
+    }
+
+    private void initConnection() {
+        neo4jDriver = GraphDatabase.driver(CONTAINER_URI, AuthTokens.basic(CONTAINER_NEO4J_USERNAME, CONTAINER_NEO4J_PASSWORD));
+        neo4jSession = neo4jDriver.session(SessionConfig.forDatabase("neo4j"));
+    }
+
+    @Test
+    public void testSink() throws IOException, InterruptedException {
+        // when
+        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/neo4j/fake_to_neo4j.conf");
+
+        // then
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        final Result result = neo4jSession.run("MATCH (a:Person) RETURN a.name, a.age");
+        Assertions.assertTrue(result.stream().findAny().isPresent());
+        Assertions.assertTrue(result.stream().anyMatch(record -> record.get("a.age").asInt() > 0));
+
+    }
+
+    @Test
+    public void testSource() throws IOException, InterruptedException {
+        // given
+        neo4jSession.run("CREATE (a:Person {name: 'foo', age: 10})");

Review Comment:
   Same as above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] getChan commented on pull request #2777: [Feature][Connector-v2] Neo4j source connector

Posted by GitBox <gi...@apache.org>.
getChan commented on PR #2777:
URL: https://github.com/apache/incubator-seatunnel/pull/2777#issuecomment-1272584907

   @EricJoy2048 @hailin0 
   I applied your reviews.
   except for `TIME` datatypes at spark. It does not supported at spark 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs commented on pull request #2777: [Feature][Connector-v2] Neo4j source connector

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on PR #2777:
URL: https://github.com/apache/incubator-seatunnel/pull/2777#issuecomment-1250484803

   hi, thanks for your contribution, could you add the e2e test for this PR?
   
   https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-e2e


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2777: [Feature][Connector-v2] Neo4j source connector

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2777:
URL: https://github.com/apache/incubator-seatunnel/pull/2777#discussion_r1001313492


##########
docs/en/connector-v2/source/Neo4j.md:
##########
@@ -0,0 +1,100 @@
+# Neo4j
+
+> Neo4j source connector
+
+## Description
+
+Read data from Neo4j.
+
+`neo4j-java-driver` version 4.4.9
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name                       | type   | required | default value |
+|----------------------------|--------|----------|---------------|
+| uri                        | String | Yes      | -             |
+| username                   | String | No       | -             |
+| password                   | String | No       | -             |
+| bearer_token               | String | No       | -             |
+| kerberos_ticket            | String | No       | -             |
+| database                   | String | Yes      | -             |
+| query                      | String | Yes      | -             |
+| schema.fields              | Object | Yes      | -             |
+| max_transaction_retry_time | Long   | No       | 30            |
+| max_connection_timeout     | Long   | No       | 30            |
+
+### uri [string]
+
+The URI of the Neo4j database. Refer to a case: `neo4j://localhost:7687`
+
+### username [string]
+
+username of the Neo4j
+
+### password [string]
+
+password of the Neo4j. required if `username` is provided
+
+### bearer_token [string]
+
+base64 encoded bearer token of the Neo4j. for Auth.
+
+### kerberos_ticket [string]
+
+base64 encoded kerberos ticket of the Neo4j. for Auth.
+
+### database [string]
+
+database name.
+
+### query [string]
+
+Query statement.
+
+### schema.fields [string]
+
+returned fields of `query`
+
+see [schema projection](../../concept/connector-v2-features.md)
+
+### max_transaction_retry_time [long]
+
+maximum transaction retry time(seconds). transaction fail if exceeded
+
+### max_connection_timeout [long]
+
+The maximum amount of time to wait for a TCP connection to be established (seconds)
+
+## Example
+
+```
+source {
+    Neo4j {
+        uri = "neo4j://localhost:7687"
+        username = "neo4j"
+        password = "1234"
+        database = "neo4j"
+    
+        max_transaction_retry_time = 1
+        max_connection_timeout = 1
+    
+        query = "MATCH (a:Person) RETURN a.name, a.age"
+    
+        schema {
+            fields {
+                a.age=INT
+                a.name=STRING

Review Comment:
   > is the dot naming in the field forbidden in SeaTunnel?
   
   You could use,   seatunnel currently has no restrictions on field names



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] getChan commented on pull request #2777: [Feature][Connector-v2] Neo4j source connector

Posted by GitBox <gi...@apache.org>.
getChan commented on PR #2777:
URL: https://github.com/apache/incubator-seatunnel/pull/2777#issuecomment-1255079808

   @hailin0 
   thanks for your review. I fixed it.
   and add e2e test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] getChan commented on a diff in pull request #2777: [Feature][Connector-v2] Neo4j source connector

Posted by GitBox <gi...@apache.org>.
getChan commented on code in PR #2777:
URL: https://github.com/apache/incubator-seatunnel/pull/2777#discussion_r994627702


##########
docs/en/connector-v2/source/Neo4j.md:
##########
@@ -0,0 +1,100 @@
+# Neo4j
+
+> Neo4j source connector
+
+## Description
+
+Read data from Neo4j.
+
+`neo4j-java-driver` version 4.4.9
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name                       | type   | required | default value |
+|----------------------------|--------|----------|---------------|
+| uri                        | String | Yes      | -             |
+| username                   | String | No       | -             |
+| password                   | String | No       | -             |
+| bearer_token               | String | No       | -             |
+| kerberos_ticket            | String | No       | -             |
+| database                   | String | Yes      | -             |
+| query                      | String | Yes      | -             |
+| schema.fields              | Object | Yes      | -             |
+| max_transaction_retry_time | Long   | No       | 30            |
+| max_connection_timeout     | Long   | No       | 30            |
+
+### uri [string]
+
+The URI of the Neo4j database. Refer to a case: `neo4j://localhost:7687`
+
+### username [string]
+
+username of the Neo4j
+
+### password [string]
+
+password of the Neo4j. required if `username` is provided
+
+### bearer_token [string]
+
+base64 encoded bearer token of the Neo4j. for Auth.
+
+### kerberos_ticket [string]
+
+base64 encoded kerberos ticket of the Neo4j. for Auth.
+
+### database [string]
+
+database name.
+
+### query [string]
+
+Query statement.
+
+### schema.fields [string]
+
+returned fields of `query`
+
+see [schema projection](../../concept/connector-v2-features.md)
+
+### max_transaction_retry_time [long]
+
+maximum transaction retry time(seconds). transaction fail if exceeded
+
+### max_connection_timeout [long]
+
+The maximum amount of time to wait for a TCP connection to be established (seconds)
+
+## Example
+
+```
+source {
+    Neo4j {
+        uri = "neo4j://localhost:7687"
+        username = "neo4j"
+        password = "1234"
+        database = "neo4j"
+    
+        max_transaction_retry_time = 1
+        max_connection_timeout = 1
+    
+        query = "MATCH (a:Person) RETURN a.name, a.age"
+    
+        schema {
+            fields {
+                a.age=INT
+                a.name=STRING

Review Comment:
   @hailin0 
   I think the dot(`.`) expression is familiar to neo4j users.
   because dot means reference of node's property.
   
   is the dot naming in the field forbidden in SeaTunnel?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 merged pull request #2777: [Feature][Connector-v2] Neo4j source connector

Posted by GitBox <gi...@apache.org>.
EricJoy2048 merged PR #2777:
URL: https://github.com/apache/incubator-seatunnel/pull/2777


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org