You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/11/12 18:18:49 UTC

[GitHub] [beam] ibzib commented on a change in pull request #15916: [BEAM-1857] Add Neo4jIO (initial code drop)

ibzib commented on a change in pull request #15916:
URL: https://github.com/apache/beam/pull/15916#discussion_r747844724



##########
File path: sdks/java/io/neo4j/build.gradle
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.io.neo4j')
+provideIntegrationTestingDependencies()
+enableJavaPerformanceTesting()
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Neo4j"
+ext.summary = "IO to read from and write to Neo4j graphs"
+
+dependencies {
+    compile project(path: ":sdks:java:core", configuration: "shadow")
+    compile "org.neo4j.driver:neo4j-java-driver:4.3.4"

Review comment:
       Note to self: this is apache 2 licensed https://mvnrepository.com/artifact/org.neo4j.driver/neo4j-java-driver

##########
File path: sdks/java/io/neo4j/build.gradle
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.io.neo4j')
+provideIntegrationTestingDependencies()
+enableJavaPerformanceTesting()
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Neo4j"
+ext.summary = "IO to read from and write to Neo4j graphs"
+
+dependencies {
+    compile project(path: ":sdks:java:core", configuration: "shadow")
+    compile "org.neo4j.driver:neo4j-java-driver:4.3.4"
+    compile library.java.slf4j_api
+    compile library.java.vendored_guava_26_0_jre
+    testCompile library.java.junit
+    testCompile library.java.hamcrest_core
+    testCompile library.java.hamcrest_library
+    testCompile project(path: ":sdks:java:io:common", configuration: "testRuntime")
+    testCompile project(path: ":sdks:java:testing:test-utils", configuration: "testRuntime")
+    testRuntimeOnly library.java.slf4j_jdk14
+    testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
+    testImplementation "org.testcontainers:neo4j:1.16.2"

Review comment:
       Note to self: this is MIT licensed, which is category A under Apache policy
   
   https://mvnrepository.com/artifact/org.testcontainers/neo4j/1.16.2
   https://www.apache.org/legal/resolved.html#category-a

##########
File path: sdks/java/io/neo4j/src/main/java/org/apache/beam/sdk/io/neo4j/Neo4jIO.java
##########
@@ -0,0 +1,1346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.neo4j;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.harness.JvmInitializer;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Logging;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.TransactionConfig;
+import org.neo4j.driver.TransactionWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a Beam IO to read from, and write data to, Neo4j.
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <h3>Driver configuration</h3>
+ *
+ * <p>To read from or write to Neo4j you have to provide a {@link DriverConfiguration} using<br>
+ * 1. {@link DriverConfiguration#create()} (which must be {@link Serializable});<br>
+ * 2. or {@link DriverConfiguration#create(String, String, String)} (URL, username and password).
+ *
+ * <p>If you have trouble connecting to a Neo4j Aura database please try to disable a few security
+ * algorithms in your JVM. This makes sure that the right one is picked to connect:
+ *
+ * <p>
+ *
+ * <pre>{@code
+ * Security.setProperty(
+ *       "jdk.tls.disabledAlgorithms",
+ *       "SSLv3, RC4, DES, MD5withRSA, DH keySize < 1024, EC keySize < 224, 3DES_EDE_CBC, anon, NULL");
+ * }</pre>
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <p>To execute this code on GCP Dataflow you can create a class which extends {@link
+ * JvmInitializer} and implement the {@link JvmInitializer#onStartup()} method. You need to annotate
+ * this new class with {@link com.google.auto.service.AutoService}
+ *
+ * <pre>{@code
+ * @AutoService(value = JvmInitializer.class)
+ * }</pre>
+ *
+ * <p>
+ *
+ * <h3>Reading from Neo4j</h3>
+ *
+ * <p>{@link Neo4jIO#readAll()} source returns a bounded collection of {@code OuptutT} as a {@code
+ * PCollection<OutputT>}. OutputT is the type returned by the provided {@link RowMapper}. It accepts
+ * parameters as input in the form of {@code ParameterT} as a {@code PCollection<ParameterT>}
+ *
+ * <p>The following example reads ages to return the IDs of Person nodes. It runs a Cypher query for
+ * each provided age.
+ *
+ * <p>The mapping {@link SerializableFunction} maps input values to each execution of the Cypher
+ * statement. In the function simply return a map containing the parameters you want to set.
+ *
+ * <p>The {@link RowMapper} converts output Neo4j {@link Record} values to the output of the source.
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(Create.of(40, 50, 60))
+ *   .apply(Neo4jIO.<Integer, String>readAll()
+ *     .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *     .withCypher("MATCH(n:Person) WHERE n.age = $age RETURN n.id")
+ *     .withReadTransaction()
+ *     .withCoder(StringUtf8Coder.of())
+ *     .withParametersFunction( age -> Collections.singletonMap( "age", age ))
+ *     .withRowMapper( record -> return record.get(0).asString() )
+ *   );
+ * }</pre>
+ *
+ * <h3>Writing to Neo4j</h3>
+ *
+ * <p>Neo4j sink supports writing data to a graph. It writes a {@link PCollection} to the graph by
+ * collecting a batch of rows after which all rows in this batch are written together to Neo4j. This
+ * is done using the {@link WriteUnwind} sink transform.
+ *
+ * <p>Like the source, to configure the sink, you have to provide a {@link DriverConfiguration}.
+ *
+ * <p>In the following example we'll merge a collection of {@link org.apache.beam.sdk.values.Row}
+ * into Person nodes. Since this is a Sink it has no output and as such no RowMapper is needed. The
+ * rows are being used as a container for the parameters of the Cypher statement. The used Cypher in
+ * question needs to be an UNWIND statement. Like in the read case, the parameters {@link
+ * SerializableFunction} converts parameter values to a {@link Map<String, Object>}. The difference
+ * here is that the resulting Map is stored in a {@link List} (containing maps) which in turn is
+ * stored in another Map under the name provided by the {@link
+ * WriteUnwind#withUnwindMapName(String)} method. All of this is handled automatically. You do need
+ * to provide the unwind map name so that you can reference that in the UNWIND statement.
+ *
+ * <p>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(...)
+ *   .apply(Neo4jIO.<Row>writeUnwind()
+ *      .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *      .withUnwindMapName("rows")
+ *      .withCypher("UNWIND $rows AS row MERGE(n:Person { id : row.id } ) SET n.firstName = row.first, n.lastName = row.last")
+ *      .withParametersFunction( row -> ImmutableMap.of(
+ *        "id", row.getString("id),
+ *        "first", row.getString("firstName")
+ *        "last", row.getString("lastName")))
+ *    );
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class Neo4jIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Neo4jIO.class);
+
+  /**
+   * Read all rows using a Neo4j Cypher query.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   * @param <OutputT> Type of the data to be read.
+   */
+  public static <ParameterT, OutputT> ReadAll<ParameterT, OutputT> readAll() {
+    return new AutoValue_Neo4jIO_ReadAll.Builder<ParameterT, OutputT>()
+        .setFetchSize(ValueProvider.StaticValueProvider.of(Config.defaultConfig().fetchSize()))
+        .build();
+  }
+
+  /**
+   * Write all rows using a Neo4j Cypher UNWIND cypher statement. This sets a default batch batch
+   * size of 5000.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   */
+  public static <ParameterT> WriteUnwind<ParameterT> writeUnwind() {
+    return new AutoValue_Neo4jIO_WriteUnwind.Builder<ParameterT>()
+        .setBatchSize(ValueProvider.StaticValueProvider.of(5000L))
+        .build();
+  }
+
+  private static <ParameterT, OutputT> PCollection<OutputT> getOutputPCollection(
+      PCollection<ParameterT> input, DoFn<ParameterT, OutputT> writeFn, Coder<OutputT> coder) {
+    PCollection<OutputT> output = input.apply(ParDo.of(writeFn)).setCoder(coder);
+
+    try {
+      TypeDescriptor<OutputT> typeDesc = coder.getEncodedTypeDescriptor();
+      SchemaRegistry registry = input.getPipeline().getSchemaRegistry();
+      Schema schema = registry.getSchema(typeDesc);
+      output.setSchema(
+          schema,
+          typeDesc,
+          registry.getToRowFunction(typeDesc),
+          registry.getFromRowFunction(typeDesc));
+    } catch (NoSuchSchemaException e) {
+      // ignore

Review comment:
       Why is this safe to ignore?

##########
File path: sdks/java/io/neo4j/src/main/java/org/apache/beam/sdk/io/neo4j/Neo4jIO.java
##########
@@ -0,0 +1,1346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.neo4j;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.harness.JvmInitializer;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Logging;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.TransactionConfig;
+import org.neo4j.driver.TransactionWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a Beam IO to read from, and write data to, Neo4j.
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <h3>Driver configuration</h3>
+ *
+ * <p>To read from or write to Neo4j you have to provide a {@link DriverConfiguration} using<br>
+ * 1. {@link DriverConfiguration#create()} (which must be {@link Serializable});<br>
+ * 2. or {@link DriverConfiguration#create(String, String, String)} (URL, username and password).
+ *
+ * <p>If you have trouble connecting to a Neo4j Aura database please try to disable a few security
+ * algorithms in your JVM. This makes sure that the right one is picked to connect:
+ *
+ * <p>
+ *
+ * <pre>{@code
+ * Security.setProperty(
+ *       "jdk.tls.disabledAlgorithms",
+ *       "SSLv3, RC4, DES, MD5withRSA, DH keySize < 1024, EC keySize < 224, 3DES_EDE_CBC, anon, NULL");
+ * }</pre>
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <p>To execute this code on GCP Dataflow you can create a class which extends {@link
+ * JvmInitializer} and implement the {@link JvmInitializer#onStartup()} method. You need to annotate
+ * this new class with {@link com.google.auto.service.AutoService}
+ *
+ * <pre>{@code
+ * @AutoService(value = JvmInitializer.class)
+ * }</pre>
+ *
+ * <p>
+ *
+ * <h3>Reading from Neo4j</h3>
+ *
+ * <p>{@link Neo4jIO#readAll()} source returns a bounded collection of {@code OuptutT} as a {@code
+ * PCollection<OutputT>}. OutputT is the type returned by the provided {@link RowMapper}. It accepts
+ * parameters as input in the form of {@code ParameterT} as a {@code PCollection<ParameterT>}
+ *
+ * <p>The following example reads ages to return the IDs of Person nodes. It runs a Cypher query for
+ * each provided age.
+ *
+ * <p>The mapping {@link SerializableFunction} maps input values to each execution of the Cypher
+ * statement. In the function simply return a map containing the parameters you want to set.
+ *
+ * <p>The {@link RowMapper} converts output Neo4j {@link Record} values to the output of the source.
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(Create.of(40, 50, 60))
+ *   .apply(Neo4jIO.<Integer, String>readAll()
+ *     .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *     .withCypher("MATCH(n:Person) WHERE n.age = $age RETURN n.id")
+ *     .withReadTransaction()
+ *     .withCoder(StringUtf8Coder.of())
+ *     .withParametersFunction( age -> Collections.singletonMap( "age", age ))
+ *     .withRowMapper( record -> return record.get(0).asString() )
+ *   );
+ * }</pre>
+ *
+ * <h3>Writing to Neo4j</h3>
+ *
+ * <p>Neo4j sink supports writing data to a graph. It writes a {@link PCollection} to the graph by
+ * collecting a batch of rows after which all rows in this batch are written together to Neo4j. This
+ * is done using the {@link WriteUnwind} sink transform.
+ *
+ * <p>Like the source, to configure the sink, you have to provide a {@link DriverConfiguration}.
+ *
+ * <p>In the following example we'll merge a collection of {@link org.apache.beam.sdk.values.Row}
+ * into Person nodes. Since this is a Sink it has no output and as such no RowMapper is needed. The
+ * rows are being used as a container for the parameters of the Cypher statement. The used Cypher in
+ * question needs to be an UNWIND statement. Like in the read case, the parameters {@link
+ * SerializableFunction} converts parameter values to a {@link Map<String, Object>}. The difference
+ * here is that the resulting Map is stored in a {@link List} (containing maps) which in turn is
+ * stored in another Map under the name provided by the {@link
+ * WriteUnwind#withUnwindMapName(String)} method. All of this is handled automatically. You do need
+ * to provide the unwind map name so that you can reference that in the UNWIND statement.
+ *
+ * <p>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(...)
+ *   .apply(Neo4jIO.<Row>writeUnwind()
+ *      .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *      .withUnwindMapName("rows")
+ *      .withCypher("UNWIND $rows AS row MERGE(n:Person { id : row.id } ) SET n.firstName = row.first, n.lastName = row.last")
+ *      .withParametersFunction( row -> ImmutableMap.of(
+ *        "id", row.getString("id),
+ *        "first", row.getString("firstName")
+ *        "last", row.getString("lastName")))
+ *    );
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class Neo4jIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Neo4jIO.class);
+
+  /**
+   * Read all rows using a Neo4j Cypher query.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   * @param <OutputT> Type of the data to be read.
+   */
+  public static <ParameterT, OutputT> ReadAll<ParameterT, OutputT> readAll() {
+    return new AutoValue_Neo4jIO_ReadAll.Builder<ParameterT, OutputT>()
+        .setFetchSize(ValueProvider.StaticValueProvider.of(Config.defaultConfig().fetchSize()))
+        .build();
+  }
+
+  /**
+   * Write all rows using a Neo4j Cypher UNWIND cypher statement. This sets a default batch batch
+   * size of 5000.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   */
+  public static <ParameterT> WriteUnwind<ParameterT> writeUnwind() {
+    return new AutoValue_Neo4jIO_WriteUnwind.Builder<ParameterT>()
+        .setBatchSize(ValueProvider.StaticValueProvider.of(5000L))
+        .build();
+  }
+
+  private static <ParameterT, OutputT> PCollection<OutputT> getOutputPCollection(
+      PCollection<ParameterT> input, DoFn<ParameterT, OutputT> writeFn, Coder<OutputT> coder) {
+    PCollection<OutputT> output = input.apply(ParDo.of(writeFn)).setCoder(coder);
+
+    try {
+      TypeDescriptor<OutputT> typeDesc = coder.getEncodedTypeDescriptor();
+      SchemaRegistry registry = input.getPipeline().getSchemaRegistry();
+      Schema schema = registry.getSchema(typeDesc);
+      output.setSchema(
+          schema,
+          typeDesc,
+          registry.getToRowFunction(typeDesc),
+          registry.getFromRowFunction(typeDesc));
+    } catch (NoSuchSchemaException e) {
+      // ignore
+    }
+    return output;
+  }
+
+  /**
+   * An interface used by {@link ReadAll} for converting each row of a Neo4j {@link Result} record
+   * {@link Record} into an element of the resulting {@link PCollection}.
+   */
+  @FunctionalInterface
+  public interface RowMapper<T> extends Serializable {
+    T mapRow(Record record) throws Exception;
+  }
+
+  /** This describes all the information needed to create a Neo4j {@link Session}. */
+  @AutoValue
+  public abstract static class DriverConfiguration implements Serializable {
+    public static DriverConfiguration create() {
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder().build();
+    }
+
+    public static DriverConfiguration create(String url, String username, String password) {
+      checkArgument(url != null, "url can not be null");
+      checkArgument(username != null, "username can not be null");
+      checkArgument(password != null, "password can not be null");
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder()
+          .build()
+          .withUrl(url)
+          .withUsername(username)
+          .withPassword(password);
+    }
+
+    abstract @Nullable ValueProvider<String> getUrl();
+
+    abstract @Nullable ValueProvider<List<String>> getUrls();
+
+    abstract @Nullable ValueProvider<String> getUsername();
+
+    abstract @Nullable ValueProvider<String> getPassword();
+
+    abstract @Nullable ValueProvider<Boolean> getEncryption();
+
+    abstract @Nullable ValueProvider<Long> getConnectionLivenessCheckTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getMaxConnectionLifetimeMs();
+
+    abstract @Nullable ValueProvider<Integer> getMaxConnectionPoolSize();
+
+    abstract @Nullable ValueProvider<Long> getConnectionAcquisitionTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getConnectionTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getMaxTransactionRetryTimeMs();
+
+    abstract @Nullable ValueProvider<Boolean> getRouting();
+
+    abstract Builder builder();
+
+    // URL
+    public DriverConfiguration withUrl(String url) {
+      return withUrl(ValueProvider.StaticValueProvider.of(url));
+    }
+
+    public DriverConfiguration withUrl(ValueProvider<String> url) {
+      Preconditions.checkArgument(
+          url != null, "a neo4j connection URL can not be empty or null", url);
+      Preconditions.checkArgument(
+          StringUtils.isNotEmpty(url.get()),
+          "a neo4j connection URL can not be empty or null",
+          url);
+      return builder().setUrl(url).build();
+    }
+
+    // URLS
+    public DriverConfiguration withUrls(List<String> urls) {
+      return withUrls(ValueProvider.StaticValueProvider.of(urls));
+    }
+
+    public DriverConfiguration withUrls(ValueProvider<List<String>> urls) {
+      Preconditions.checkArgument(
+          urls != null, "a list of neo4j connection URLs can not be empty or null", urls);
+      Preconditions.checkArgument(
+          urls.get() != null && !urls.get().isEmpty(),
+          "a neo4j connection URL can not be empty or null",
+          urls);
+      return builder().setUrls(urls).build();
+    }
+
+    // Encryption
+    public DriverConfiguration withEncryption() {
+      return builder().setEncryption(ValueProvider.StaticValueProvider.of(Boolean.TRUE)).build();
+    }
+
+    public DriverConfiguration withoutEncryption() {
+      return builder().setEncryption(ValueProvider.StaticValueProvider.of(Boolean.FALSE)).build();
+    }
+
+    // Connection Liveness Check Timout
+    public DriverConfiguration withConnectionLivenessCheckTimeoutMs(
+        long connectionLivenessCheckTimeoutMs) {
+      return withConnectionLivenessCheckTimeoutMs(
+          ValueProvider.StaticValueProvider.of(connectionLivenessCheckTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionLivenessCheckTimeoutMs(
+        ValueProvider<Long> connectionLivenessCheckTimeoutMs) {
+      return builder()
+          .setConnectionLivenessCheckTimeoutMs(connectionLivenessCheckTimeoutMs)
+          .build();
+    }
+
+    // Maximum Connection Lifetime
+    public DriverConfiguration withMaxConnectionLifetimeMs(long maxConnectionLifetimeMs) {
+      return withMaxConnectionLifetimeMs(
+          ValueProvider.StaticValueProvider.of(maxConnectionLifetimeMs));
+    }
+
+    public DriverConfiguration withMaxConnectionLifetimeMs(
+        ValueProvider<Long> maxConnectionLifetimeMs) {
+      return builder().setMaxConnectionLifetimeMs(maxConnectionLifetimeMs).build();
+    }
+
+    // Maximum Connection pool size
+    public DriverConfiguration withMaxConnectionPoolSize(int maxConnectionPoolSize) {
+      return withMaxConnectionPoolSize(ValueProvider.StaticValueProvider.of(maxConnectionPoolSize));
+    }
+
+    public DriverConfiguration withMaxConnectionPoolSize(
+        ValueProvider<Integer> maxConnectionPoolSize) {
+      return builder().setMaxConnectionPoolSize(maxConnectionPoolSize).build();
+    }
+
+    // Connection Acq Timeout
+    public DriverConfiguration withConnectionAcquisitionTimeoutMs(
+        long connectionAcquisitionTimeoutMs) {
+      return withConnectionAcquisitionTimeoutMs(
+          ValueProvider.StaticValueProvider.of(connectionAcquisitionTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionAcquisitionTimeoutMs(
+        ValueProvider<Long> connectionAcquisitionTimeoutMs) {
+      return builder().setConnectionAcquisitionTimeoutMs(connectionAcquisitionTimeoutMs).build();
+    }
+
+    // Connection Timeout
+    public DriverConfiguration withConnectionTimeoutMs(long connectionTimeoutMs) {
+      return withConnectionTimeoutMs(ValueProvider.StaticValueProvider.of(connectionTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionTimeoutMs(ValueProvider<Long> connectionTimeoutMs) {
+      return builder().setConnectionTimeoutMs(connectionTimeoutMs).build();
+    }
+
+    // Maximum Transaction Retry Time
+    public DriverConfiguration withMaxTransactionRetryTimeMs(long maxTransactionRetryTimeMs) {
+      return withMaxTransactionRetryTimeMs(
+          ValueProvider.StaticValueProvider.of(maxTransactionRetryTimeMs));
+    }
+
+    public DriverConfiguration withMaxTransactionRetryTimeMs(
+        ValueProvider<Long> maxTransactionRetryTimeMs) {
+      return builder().setMaxTransactionRetryTimeMs(maxTransactionRetryTimeMs).build();
+    }
+
+    public DriverConfiguration withUsername(String username) {
+      return withUsername(ValueProvider.StaticValueProvider.of(username));
+    }
+
+    public DriverConfiguration withUsername(ValueProvider<String> username) {
+      Preconditions.checkArgument(username != null, "neo4j username can not be null", username);
+      Preconditions.checkArgument(
+          username.get() != null, "neo4j username can not be null", username);
+      return builder().setUsername(username).build();
+    }
+
+    public DriverConfiguration withPassword(String password) {
+      return withPassword(ValueProvider.StaticValueProvider.of(password));
+    }
+
+    public DriverConfiguration withPassword(ValueProvider<String> password) {
+      Preconditions.checkArgument(password != null, "neo4j password can not be null", password);
+      Preconditions.checkArgument(
+          password.get() != null, "neo4j password can not be null", password);
+      return builder().setPassword(password).build();
+    }
+
+    // Encryption
+    public DriverConfiguration withRouting() {
+      return builder().setRouting(ValueProvider.StaticValueProvider.of(Boolean.TRUE)).build();
+    }
+
+    public DriverConfiguration withoutRouting() {
+      return builder().setRouting(ValueProvider.StaticValueProvider.of(Boolean.FALSE)).build();
+    }
+
+    void populateDisplayData(DisplayData.Builder builder) {
+      builder.addIfNotNull(DisplayData.item("neo4j-url", getUrl()));
+      builder.addIfNotNull(DisplayData.item("neo4j-username", getUsername()));
+      builder.addIfNotNull(
+          DisplayData.item(
+              "neo4j-password", getPassword() != null ? "<provided>" : "<not-provided>"));
+      builder.addIfNotNull(
+          DisplayData.item(
+              "neo4j-encryption", getEncryption() != null ? "<provided>" : "<not-provided>"));
+    }
+
+    Driver buildDriver() {
+      // Create the Neo4j Driver
+      // The default is: have the driver make the determination
+      //
+      Config.ConfigBuilder configBuilder = Config.builder();
+
+      if (getEncryption() != null && getEncryption().get() != null) {
+        if (getEncryption().get()) {
+          configBuilder =
+              Config.builder()
+                  .withEncryption()
+                  .withTrustStrategy(Config.TrustStrategy.trustAllCertificates());
+        } else {
+          configBuilder = Config.builder().withoutEncryption();
+        }
+      }
+
+      // physical layer
+      if (getConnectionLivenessCheckTimeoutMs() != null
+          && getConnectionLivenessCheckTimeoutMs().get() != null
+          && getConnectionLivenessCheckTimeoutMs().get() > 0) {
+        configBuilder =
+            configBuilder.withConnectionLivenessCheckTimeout(
+                getConnectionLivenessCheckTimeoutMs().get(), TimeUnit.MILLISECONDS);
+      }
+      if (getMaxConnectionLifetimeMs() != null
+          && getMaxConnectionLifetimeMs().get() != null
+          && getMaxConnectionLifetimeMs().get() > 0) {
+        configBuilder =
+            configBuilder.withMaxConnectionLifetime(
+                getMaxConnectionLifetimeMs().get(), TimeUnit.MILLISECONDS);
+      }
+      if (getMaxConnectionPoolSize() != null && getMaxConnectionPoolSize().get() > 0) {
+        configBuilder = configBuilder.withMaxConnectionPoolSize(getMaxConnectionPoolSize().get());
+      }
+      if (getConnectionAcquisitionTimeoutMs() != null
+          && getConnectionAcquisitionTimeoutMs().get() != null
+          && getConnectionAcquisitionTimeoutMs().get() > 0) {
+        configBuilder =
+            configBuilder.withConnectionAcquisitionTimeout(
+                getConnectionAcquisitionTimeoutMs().get(), TimeUnit.MILLISECONDS);
+      }
+      if (getConnectionTimeoutMs() != null
+          && getConnectionTimeoutMs().get() != null
+          && getConnectionTimeoutMs().get() > 0) {
+        configBuilder =
+            configBuilder.withConnectionTimeout(
+                getConnectionTimeoutMs().get(), TimeUnit.MILLISECONDS);
+      }
+      if (getMaxTransactionRetryTimeMs() != null
+          && getMaxTransactionRetryTimeMs().get() != null
+          && getMaxTransactionRetryTimeMs().get() > 0) {
+        configBuilder =
+            configBuilder.withMaxTransactionRetryTime(
+                getMaxTransactionRetryTimeMs().get(), TimeUnit.MILLISECONDS);
+      }
+
+      // Set sane Logging level
+      //
+      configBuilder = configBuilder.withLogging(Logging.javaUtilLogging(Level.WARNING));
+
+      // Now we have the configuration for the driver
+      //
+      Config config = configBuilder.build();
+
+      // Get the list of the URI to connect with
+      //
+      List<URI> uris = new ArrayList<>();
+      if (getUrl() != null && getUrl().get() != null) {
+        try {
+          uris.add(new URI(getUrl().get()));
+        } catch (URISyntaxException e) {
+          throw new RuntimeException("Error creating URI from URL '" + getUrl().get() + "'", e);
+        }
+      }
+      if (getUrls() != null && getUrls().get() != null) {
+        List<String> urls = getUrls().get();
+        for (String url : urls) {
+          try {
+            uris.add(new URI(url));
+          } catch (URISyntaxException e) {
+            throw new RuntimeException(
+                "Error creating URI '"
+                    + getUrl().get()
+                    + "' from a list of "
+                    + urls.size()
+                    + " URLs",
+                e);
+          }
+        }
+      }
+
+      checkArgument(
+          getUsername() != null && getUsername().get() != null,
+          "please provide a username to connect to Neo4j");
+      checkArgument(
+          getPassword() != null && getPassword().get() != null,
+          "please provide a password to connect to Neo4j");
+
+      Driver driver;
+      if (getRouting() != null && getRouting().get() != null && getRouting().get()) {
+        driver =
+            GraphDatabase.routingDriver(
+                uris, AuthTokens.basic(getUsername().get(), getPassword().get()), config);
+      } else {
+        // Just take the first URI that was provided
+        driver =
+            GraphDatabase.driver(
+                uris.get(0), AuthTokens.basic(getUsername().get(), getPassword().get()), config);
+      }
+
+      // Now we create a
+      return driver;
+    }
+
+    /**
+     * The Builder class below is not visible. We use it to service the "with" methods below the
+     * Builder class.
+     */
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setUrl(ValueProvider<String> url);
+
+      abstract Builder setUrls(ValueProvider<List<String>> url);
+
+      abstract Builder setUsername(ValueProvider<String> username);
+
+      abstract Builder setPassword(ValueProvider<String> password);
+
+      abstract Builder setEncryption(ValueProvider<Boolean> encryption);
+
+      abstract Builder setConnectionLivenessCheckTimeoutMs(
+          ValueProvider<Long> connectionLivenessCheckTimeoutMs);
+
+      abstract Builder setMaxConnectionLifetimeMs(ValueProvider<Long> maxConnectionLifetimeMs);
+
+      abstract Builder setMaxConnectionPoolSize(ValueProvider<Integer> maxConnectionPoolSize);
+
+      abstract Builder setConnectionAcquisitionTimeoutMs(
+          ValueProvider<Long> connectionAcquisitionTimeoutMs);
+
+      abstract Builder setConnectionTimeoutMs(ValueProvider<Long> connectionTimeoutMs);
+
+      abstract Builder setMaxTransactionRetryTimeMs(ValueProvider<Long> maxTransactionRetryTimeMs);
+
+      abstract Builder setRouting(ValueProvider<Boolean> routing);
+
+      abstract DriverConfiguration build();
+    }
+  }
+
+  /** This is the class which handles the work behind the {@link #readAll} method. */
+  @AutoValue
+  public abstract static class ReadAll<ParameterT, OutputT>
+      extends PTransform<PCollection<ParameterT>, PCollection<OutputT>> {
+
+    abstract @Nullable SerializableFunction<Void, Driver> getDriverProviderFn();
+
+    abstract @Nullable ValueProvider<String> getDatabase();
+
+    abstract @Nullable ValueProvider<String> getCypher();
+
+    abstract @Nullable ValueProvider<Boolean> getWriteTransaction();
+
+    abstract @Nullable ValueProvider<Long> getTransactionTimeoutMs();
+
+    abstract @Nullable RowMapper<OutputT> getRowMapper();
+
+    abstract @Nullable SerializableFunction<ParameterT, Map<String, Object>>
+        getParametersFunction();
+
+    abstract @Nullable Coder<OutputT> getCoder();
+
+    abstract @Nullable ValueProvider<Long> getFetchSize();
+
+    abstract @Nullable ValueProvider<Boolean> getLogCypher();
+
+    abstract Builder<ParameterT, OutputT> toBuilder();
+
+    // Driver configuration
+    public ReadAll<ParameterT, OutputT> withDriverConfiguration(DriverConfiguration config) {
+      return toBuilder()
+          .setDriverProviderFn(new DriverProviderFromDriverConfiguration(config))
+          .build();
+    }
+
+    // Cypher
+    public ReadAll<ParameterT, OutputT> withCypher(String cypher) {
+      checkArgument(
+          cypher != null, "Neo4jIO.readAll().withCypher(query) called with null cypher query");
+      return withCypher(ValueProvider.StaticValueProvider.of(cypher));
+    }
+
+    public ReadAll<ParameterT, OutputT> withCypher(ValueProvider<String> cypher) {
+      checkArgument(cypher != null, "Neo4jIO.readAll().withCypher(cypher) called with null cypher");
+      return toBuilder().setCypher(cypher).build();
+    }
+
+    // Transaction timeout
+    public ReadAll<ParameterT, OutputT> withTransactionTimeoutMs(long timeout) {
+      checkArgument(
+          timeout > 0,
+          "Neo4jIO.readAll().withTransactionTimeOutMs(timeout) called with timeout<=0");
+      return withTransactionTimeoutMs(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public ReadAll<ParameterT, OutputT> withTransactionTimeoutMs(ValueProvider<Long> timeout) {
+      checkArgument(
+          timeout != null && timeout.get() > 0,
+          "Neo4jIO.readAll().withTransactionTimeOutMs(timeout) called with timeout<=0");
+      return toBuilder().setTransactionTimeoutMs(timeout).build();
+    }
+
+    // Database
+    public ReadAll<ParameterT, OutputT> withDatabase(String database) {
+      checkArgument(
+          database != null, "Neo4jIO.readAll().withDatabase(database) called with null database");
+      return withDatabase(ValueProvider.StaticValueProvider.of(database));
+    }
+
+    public ReadAll<ParameterT, OutputT> withDatabase(ValueProvider<String> database) {
+      checkArgument(
+          database != null, "Neo4jIO.readAll().withDatabase(database) called with null database");
+      return toBuilder().setDatabase(database).build();
+    }
+
+    // Fetch size
+    public ReadAll<ParameterT, OutputT> withFetchSize(long fetchSize) {
+      checkArgument(
+          fetchSize > 0, "Neo4jIO.readAll().withFetchSize(query) called with fetchSize<=0");
+      return withFetchSize(ValueProvider.StaticValueProvider.of(fetchSize));
+    }
+
+    public ReadAll<ParameterT, OutputT> withFetchSize(ValueProvider<Long> fetchSize) {
+      checkArgument(
+          fetchSize != null && fetchSize.get() >= 0,
+          "Neo4jIO.readAll().withFetchSize(query) called with fetchSize<=0");
+      return toBuilder().setFetchSize(fetchSize).build();
+    }
+
+    // Row mapper
+    public ReadAll<ParameterT, OutputT> withRowMapper(RowMapper<OutputT> rowMapper) {
+      checkArgument(
+          rowMapper != null,
+          "Neo4jIO.readAll().withRowMapper(rowMapper) called with null rowMapper");
+      return toBuilder().setRowMapper(rowMapper).build();
+    }
+
+    // Parameters mapper
+    public ReadAll<ParameterT, OutputT> withParametersFunction(
+        SerializableFunction<ParameterT, Map<String, Object>> parametersFunction) {
+      checkArgument(
+          parametersFunction != null,
+          "Neo4jIO.readAll().withParametersFunction(parametersFunction) called with null parametersFunction");
+      return toBuilder().setParametersFunction(parametersFunction).build();
+    }
+
+    // Coder
+    public ReadAll<ParameterT, OutputT> withCoder(Coder<OutputT> coder) {
+      checkArgument(coder != null, "Neo4jIO.readAll().withCoder(coder) called with null coder");
+      return toBuilder().setCoder(coder).build();
+    }
+
+    // Read/Write transaction
+    public ReadAll<ParameterT, OutputT> withReadTransaction() {
+      return toBuilder()
+          .setWriteTransaction(ValueProvider.StaticValueProvider.of(Boolean.FALSE))
+          .build();
+    }
+
+    public ReadAll<ParameterT, OutputT> withWriteTransaction() {
+      return toBuilder()
+          .setWriteTransaction(ValueProvider.StaticValueProvider.of(Boolean.TRUE))
+          .build();
+    }
+
+    // Log cypher statements
+    public ReadAll<ParameterT, OutputT> withoutCypherLogging() {
+      return toBuilder().setLogCypher(ValueProvider.StaticValueProvider.of(Boolean.FALSE)).build();
+    }
+
+    public ReadAll<ParameterT, OutputT> withCypherLogging() {
+      return toBuilder().setLogCypher(ValueProvider.StaticValueProvider.of(Boolean.TRUE)).build();
+    }
+
+    @Override
+    public PCollection<OutputT> expand(PCollection<ParameterT> input) {
+
+      checkArgument(
+          getCypher() != null && getCypher().get() != null,
+          "please provide a cypher statement to execute");
+
+      long fetchSize;
+      if (getFetchSize() == null || getFetchSize().get() <= 0) {
+        fetchSize = 0;
+      } else {
+        fetchSize = getFetchSize().get();
+      }
+
+      String databaseName = null;
+      if (getDatabase() != null) {
+        databaseName = getDatabase().get();
+      }
+
+      boolean writeTransaction = false;
+      if (getWriteTransaction() != null) {
+        writeTransaction = getWriteTransaction().get();
+      }
+
+      long transactionTimeOutMs;
+      if (getTransactionTimeoutMs() == null || getTransactionTimeoutMs().get() < 0) {
+        transactionTimeOutMs = -1;
+      } else {
+        transactionTimeOutMs = getTransactionTimeoutMs().get();
+      }
+
+      boolean logCypher = false;
+      if (getLogCypher() != null) {
+        logCypher = getLogCypher().get();
+      }
+
+      ReadFn<ParameterT, OutputT> readFn =
+          new ReadFn<>(
+              getDriverProviderFn(),
+              getCypher().get(),
+              getRowMapper(),
+              getParametersFunction(),
+              fetchSize,
+              databaseName,
+              writeTransaction,
+              transactionTimeOutMs,
+              logCypher);
+
+      return getOutputPCollection(input, readFn, getCoder());
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("cypher", getCypher()));
+      builder.add(DisplayData.item("rowMapper", getRowMapper().getClass().getName()));
+      builder.add(DisplayData.item("coder", getCoder().getClass().getName()));
+      if (getDriverProviderFn() instanceof HasDisplayData) {
+        ((HasDisplayData) getDriverProviderFn()).populateDisplayData(builder);
+      }
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder<ParameterT, OutputT> {
+      abstract Builder<ParameterT, OutputT> setDriverProviderFn(
+          SerializableFunction<Void, Driver> driverProviderFn);
+
+      abstract Builder<ParameterT, OutputT> setCypher(ValueProvider<String> cypher);
+
+      abstract Builder<ParameterT, OutputT> setDatabase(ValueProvider<String> cypher);
+
+      abstract Builder<ParameterT, OutputT> setWriteTransaction(
+          ValueProvider<Boolean> writeTransaction);
+
+      abstract Builder<ParameterT, OutputT> setTransactionTimeoutMs(
+          ValueProvider<Long> transactionTimeoutMs);
+
+      abstract Builder<ParameterT, OutputT> setRowMapper(RowMapper<OutputT> rowMapper);
+
+      abstract Builder<ParameterT, OutputT> setParametersFunction(
+          SerializableFunction<ParameterT, Map<String, Object>> parametersFunction);
+
+      abstract Builder<ParameterT, OutputT> setCoder(Coder<OutputT> coder);
+
+      abstract Builder<ParameterT, OutputT> setFetchSize(ValueProvider<Long> fetchSize);
+
+      abstract Builder<ParameterT, OutputT> setLogCypher(ValueProvider<Boolean> logCypher);
+
+      abstract ReadAll<ParameterT, OutputT> build();
+    }
+  }
+
+  /** A {@link DoFn} to execute a Cypher query to read from Neo4j. */
+  private static class ReadWriteFn<ParameterT, OutputT> extends DoFn<ParameterT, OutputT> {
+    protected final SerializableFunction<Void, Driver> driverProviderFn;
+    protected final String databaseName;
+    protected final long fetchSize;
+    protected final long transactionTimeoutMs;
+
+    protected transient Driver driver;
+    protected transient Session session;
+    protected transient TransactionConfig transactionConfig;
+
+    private ReadWriteFn(
+        SerializableFunction<Void, Driver> driverProviderFn,
+        String databaseName,
+        long fetchSize,
+        long transactionTimeoutMs) {
+      this.driverProviderFn = driverProviderFn;
+      this.databaseName = databaseName;
+      this.fetchSize = fetchSize;
+      this.transactionTimeoutMs = transactionTimeoutMs;
+    }
+
+    /**
+     * Delay the creation of driver and session until we actually have data to do something with.
+     */
+    @Setup
+    public void setup() {}
+
+    protected void buildDriverAndSession() {
+      if (driver == null && session == null && transactionConfig == null) {
+        driver = driverProviderFn.apply(null);
+
+        SessionConfig.Builder builder = SessionConfig.builder();
+        if (databaseName != null) {
+          builder = builder.withDatabase(databaseName);
+        }
+        builder = builder.withFetchSize(fetchSize);
+        session = driver.session(builder.build());
+
+        TransactionConfig.Builder configBuilder = TransactionConfig.builder();
+        if (transactionTimeoutMs > 0) {
+          configBuilder = configBuilder.withTimeout(Duration.ofMillis(transactionTimeoutMs));
+        }
+        transactionConfig = configBuilder.build();
+      }
+    }
+
+    @FinishBundle
+    public void finishBundle() {
+      cleanUpDriverSession();
+    }
+
+    @Override
+    protected void finalize() {
+      cleanUpDriverSession();
+    }
+
+    protected void cleanUpDriverSession() {
+      if (session != null) {
+        try {
+          session.close();
+        } finally {
+          session = null;
+        }
+      }
+      if (driver != null) {
+        try {
+          driver.close();
+        } finally {
+          driver = null;
+        }
+      }
+    }
+
+    protected String getParametersString(Map<String, Object> parametersMap) {
+      StringBuilder parametersString = new StringBuilder();
+      parametersMap
+          .keySet()
+          .forEach(
+              key -> {
+                if (parametersString.length() > 0) {
+                  parametersString.append(',');
+                }
+                parametersString.append(key).append('=');
+                Object value = parametersMap.get(key);
+                if (value == null) {
+                  parametersString.append("<null>");
+                } else {
+                  parametersString.append(value);
+                }
+              });
+      return parametersString.toString();
+    }
+  }
+
+  /** A {@link DoFn} to execute a Cypher query to read from Neo4j. */
+  private static class ReadFn<ParameterT, OutputT> extends ReadWriteFn<ParameterT, OutputT> {
+    private final String cypher;
+    private final RowMapper<OutputT> rowMapper;
+    private final SerializableFunction<ParameterT, Map<String, Object>> parametersFunction;
+    private final boolean writeTransaction;
+    private final boolean logCypher;
+
+    private ReadFn(
+        SerializableFunction<Void, Driver> driverProviderFn,
+        String cypher,
+        RowMapper<OutputT> rowMapper,
+        SerializableFunction<ParameterT, Map<String, Object>> parametersFunction,
+        long fetchSize,
+        String databaseName,
+        boolean writeTransaction,
+        long transactionTimeoutMs,
+        boolean logCypher) {
+      super(driverProviderFn, databaseName, fetchSize, transactionTimeoutMs);
+      this.cypher = cypher;
+      this.rowMapper = rowMapper;
+      this.parametersFunction = parametersFunction;
+      this.writeTransaction = writeTransaction;
+      this.logCypher = logCypher;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      // Some initialization needs to take place as late as possible.
+      // If this method is not called (no input) then we don't need to create a session

Review comment:
       Runners are lazy and they won't construct a DoFn and call `@Setup` until there's input.
   
   So I think it's preferable to put this logic in `@Setup` (unless there's something I'm missing?)

##########
File path: sdks/java/io/neo4j/src/main/java/org/apache/beam/sdk/io/neo4j/Neo4jIO.java
##########
@@ -0,0 +1,1346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.neo4j;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.harness.JvmInitializer;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Logging;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.TransactionConfig;
+import org.neo4j.driver.TransactionWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a Beam IO to read from, and write data to, Neo4j.
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <h3>Driver configuration</h3>
+ *
+ * <p>To read from or write to Neo4j you have to provide a {@link DriverConfiguration} using<br>
+ * 1. {@link DriverConfiguration#create()} (which must be {@link Serializable});<br>
+ * 2. or {@link DriverConfiguration#create(String, String, String)} (URL, username and password).
+ *
+ * <p>If you have trouble connecting to a Neo4j Aura database please try to disable a few security
+ * algorithms in your JVM. This makes sure that the right one is picked to connect:
+ *
+ * <p>
+ *
+ * <pre>{@code
+ * Security.setProperty(
+ *       "jdk.tls.disabledAlgorithms",
+ *       "SSLv3, RC4, DES, MD5withRSA, DH keySize < 1024, EC keySize < 224, 3DES_EDE_CBC, anon, NULL");
+ * }</pre>
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <p>To execute this code on GCP Dataflow you can create a class which extends {@link
+ * JvmInitializer} and implement the {@link JvmInitializer#onStartup()} method. You need to annotate
+ * this new class with {@link com.google.auto.service.AutoService}
+ *
+ * <pre>{@code
+ * @AutoService(value = JvmInitializer.class)
+ * }</pre>
+ *
+ * <p>
+ *
+ * <h3>Reading from Neo4j</h3>
+ *
+ * <p>{@link Neo4jIO#readAll()} source returns a bounded collection of {@code OuptutT} as a {@code
+ * PCollection<OutputT>}. OutputT is the type returned by the provided {@link RowMapper}. It accepts
+ * parameters as input in the form of {@code ParameterT} as a {@code PCollection<ParameterT>}
+ *
+ * <p>The following example reads ages to return the IDs of Person nodes. It runs a Cypher query for
+ * each provided age.
+ *
+ * <p>The mapping {@link SerializableFunction} maps input values to each execution of the Cypher
+ * statement. In the function simply return a map containing the parameters you want to set.
+ *
+ * <p>The {@link RowMapper} converts output Neo4j {@link Record} values to the output of the source.
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(Create.of(40, 50, 60))
+ *   .apply(Neo4jIO.<Integer, String>readAll()
+ *     .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *     .withCypher("MATCH(n:Person) WHERE n.age = $age RETURN n.id")
+ *     .withReadTransaction()
+ *     .withCoder(StringUtf8Coder.of())
+ *     .withParametersFunction( age -> Collections.singletonMap( "age", age ))
+ *     .withRowMapper( record -> return record.get(0).asString() )
+ *   );
+ * }</pre>
+ *
+ * <h3>Writing to Neo4j</h3>
+ *
+ * <p>Neo4j sink supports writing data to a graph. It writes a {@link PCollection} to the graph by
+ * collecting a batch of rows after which all rows in this batch are written together to Neo4j. This
+ * is done using the {@link WriteUnwind} sink transform.
+ *
+ * <p>Like the source, to configure the sink, you have to provide a {@link DriverConfiguration}.
+ *
+ * <p>In the following example we'll merge a collection of {@link org.apache.beam.sdk.values.Row}
+ * into Person nodes. Since this is a Sink it has no output and as such no RowMapper is needed. The
+ * rows are being used as a container for the parameters of the Cypher statement. The used Cypher in
+ * question needs to be an UNWIND statement. Like in the read case, the parameters {@link
+ * SerializableFunction} converts parameter values to a {@link Map<String, Object>}. The difference
+ * here is that the resulting Map is stored in a {@link List} (containing maps) which in turn is
+ * stored in another Map under the name provided by the {@link
+ * WriteUnwind#withUnwindMapName(String)} method. All of this is handled automatically. You do need
+ * to provide the unwind map name so that you can reference that in the UNWIND statement.
+ *
+ * <p>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(...)
+ *   .apply(Neo4jIO.<Row>writeUnwind()
+ *      .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *      .withUnwindMapName("rows")
+ *      .withCypher("UNWIND $rows AS row MERGE(n:Person { id : row.id } ) SET n.firstName = row.first, n.lastName = row.last")
+ *      .withParametersFunction( row -> ImmutableMap.of(
+ *        "id", row.getString("id),
+ *        "first", row.getString("firstName")
+ *        "last", row.getString("lastName")))
+ *    );
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class Neo4jIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Neo4jIO.class);
+
+  /**
+   * Read all rows using a Neo4j Cypher query.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   * @param <OutputT> Type of the data to be read.
+   */
+  public static <ParameterT, OutputT> ReadAll<ParameterT, OutputT> readAll() {
+    return new AutoValue_Neo4jIO_ReadAll.Builder<ParameterT, OutputT>()
+        .setFetchSize(ValueProvider.StaticValueProvider.of(Config.defaultConfig().fetchSize()))
+        .build();
+  }
+
+  /**
+   * Write all rows using a Neo4j Cypher UNWIND cypher statement. This sets a default batch batch
+   * size of 5000.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   */
+  public static <ParameterT> WriteUnwind<ParameterT> writeUnwind() {
+    return new AutoValue_Neo4jIO_WriteUnwind.Builder<ParameterT>()
+        .setBatchSize(ValueProvider.StaticValueProvider.of(5000L))
+        .build();
+  }
+
+  private static <ParameterT, OutputT> PCollection<OutputT> getOutputPCollection(
+      PCollection<ParameterT> input, DoFn<ParameterT, OutputT> writeFn, Coder<OutputT> coder) {
+    PCollection<OutputT> output = input.apply(ParDo.of(writeFn)).setCoder(coder);
+
+    try {
+      TypeDescriptor<OutputT> typeDesc = coder.getEncodedTypeDescriptor();
+      SchemaRegistry registry = input.getPipeline().getSchemaRegistry();
+      Schema schema = registry.getSchema(typeDesc);
+      output.setSchema(
+          schema,
+          typeDesc,
+          registry.getToRowFunction(typeDesc),
+          registry.getFromRowFunction(typeDesc));
+    } catch (NoSuchSchemaException e) {
+      // ignore
+    }
+    return output;
+  }
+
+  /**
+   * An interface used by {@link ReadAll} for converting each row of a Neo4j {@link Result} record
+   * {@link Record} into an element of the resulting {@link PCollection}.
+   */
+  @FunctionalInterface
+  public interface RowMapper<T> extends Serializable {
+    T mapRow(Record record) throws Exception;
+  }
+
+  /** This describes all the information needed to create a Neo4j {@link Session}. */
+  @AutoValue
+  public abstract static class DriverConfiguration implements Serializable {
+    public static DriverConfiguration create() {
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder().build();
+    }
+
+    public static DriverConfiguration create(String url, String username, String password) {
+      checkArgument(url != null, "url can not be null");
+      checkArgument(username != null, "username can not be null");
+      checkArgument(password != null, "password can not be null");
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder()
+          .build()
+          .withUrl(url)
+          .withUsername(username)
+          .withPassword(password);
+    }
+
+    abstract @Nullable ValueProvider<String> getUrl();
+
+    abstract @Nullable ValueProvider<List<String>> getUrls();
+
+    abstract @Nullable ValueProvider<String> getUsername();
+
+    abstract @Nullable ValueProvider<String> getPassword();
+
+    abstract @Nullable ValueProvider<Boolean> getEncryption();
+
+    abstract @Nullable ValueProvider<Long> getConnectionLivenessCheckTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getMaxConnectionLifetimeMs();
+
+    abstract @Nullable ValueProvider<Integer> getMaxConnectionPoolSize();
+
+    abstract @Nullable ValueProvider<Long> getConnectionAcquisitionTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getConnectionTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getMaxTransactionRetryTimeMs();
+
+    abstract @Nullable ValueProvider<Boolean> getRouting();
+
+    abstract Builder builder();
+
+    // URL
+    public DriverConfiguration withUrl(String url) {
+      return withUrl(ValueProvider.StaticValueProvider.of(url));
+    }
+
+    public DriverConfiguration withUrl(ValueProvider<String> url) {
+      Preconditions.checkArgument(
+          url != null, "a neo4j connection URL can not be empty or null", url);
+      Preconditions.checkArgument(
+          StringUtils.isNotEmpty(url.get()),
+          "a neo4j connection URL can not be empty or null",
+          url);
+      return builder().setUrl(url).build();
+    }
+
+    // URLS
+    public DriverConfiguration withUrls(List<String> urls) {
+      return withUrls(ValueProvider.StaticValueProvider.of(urls));
+    }
+
+    public DriverConfiguration withUrls(ValueProvider<List<String>> urls) {
+      Preconditions.checkArgument(
+          urls != null, "a list of neo4j connection URLs can not be empty or null", urls);
+      Preconditions.checkArgument(
+          urls.get() != null && !urls.get().isEmpty(),
+          "a neo4j connection URL can not be empty or null",
+          urls);
+      return builder().setUrls(urls).build();
+    }
+
+    // Encryption
+    public DriverConfiguration withEncryption() {
+      return builder().setEncryption(ValueProvider.StaticValueProvider.of(Boolean.TRUE)).build();
+    }
+
+    public DriverConfiguration withoutEncryption() {
+      return builder().setEncryption(ValueProvider.StaticValueProvider.of(Boolean.FALSE)).build();
+    }
+
+    // Connection Liveness Check Timout
+    public DriverConfiguration withConnectionLivenessCheckTimeoutMs(
+        long connectionLivenessCheckTimeoutMs) {
+      return withConnectionLivenessCheckTimeoutMs(
+          ValueProvider.StaticValueProvider.of(connectionLivenessCheckTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionLivenessCheckTimeoutMs(
+        ValueProvider<Long> connectionLivenessCheckTimeoutMs) {
+      return builder()
+          .setConnectionLivenessCheckTimeoutMs(connectionLivenessCheckTimeoutMs)
+          .build();
+    }
+
+    // Maximum Connection Lifetime
+    public DriverConfiguration withMaxConnectionLifetimeMs(long maxConnectionLifetimeMs) {
+      return withMaxConnectionLifetimeMs(
+          ValueProvider.StaticValueProvider.of(maxConnectionLifetimeMs));
+    }
+
+    public DriverConfiguration withMaxConnectionLifetimeMs(
+        ValueProvider<Long> maxConnectionLifetimeMs) {
+      return builder().setMaxConnectionLifetimeMs(maxConnectionLifetimeMs).build();
+    }
+
+    // Maximum Connection pool size
+    public DriverConfiguration withMaxConnectionPoolSize(int maxConnectionPoolSize) {
+      return withMaxConnectionPoolSize(ValueProvider.StaticValueProvider.of(maxConnectionPoolSize));
+    }
+
+    public DriverConfiguration withMaxConnectionPoolSize(
+        ValueProvider<Integer> maxConnectionPoolSize) {
+      return builder().setMaxConnectionPoolSize(maxConnectionPoolSize).build();
+    }
+
+    // Connection Acq Timeout
+    public DriverConfiguration withConnectionAcquisitionTimeoutMs(
+        long connectionAcquisitionTimeoutMs) {
+      return withConnectionAcquisitionTimeoutMs(
+          ValueProvider.StaticValueProvider.of(connectionAcquisitionTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionAcquisitionTimeoutMs(
+        ValueProvider<Long> connectionAcquisitionTimeoutMs) {
+      return builder().setConnectionAcquisitionTimeoutMs(connectionAcquisitionTimeoutMs).build();
+    }
+
+    // Connection Timeout
+    public DriverConfiguration withConnectionTimeoutMs(long connectionTimeoutMs) {
+      return withConnectionTimeoutMs(ValueProvider.StaticValueProvider.of(connectionTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionTimeoutMs(ValueProvider<Long> connectionTimeoutMs) {
+      return builder().setConnectionTimeoutMs(connectionTimeoutMs).build();
+    }
+
+    // Maximum Transaction Retry Time
+    public DriverConfiguration withMaxTransactionRetryTimeMs(long maxTransactionRetryTimeMs) {
+      return withMaxTransactionRetryTimeMs(
+          ValueProvider.StaticValueProvider.of(maxTransactionRetryTimeMs));
+    }
+
+    public DriverConfiguration withMaxTransactionRetryTimeMs(
+        ValueProvider<Long> maxTransactionRetryTimeMs) {
+      return builder().setMaxTransactionRetryTimeMs(maxTransactionRetryTimeMs).build();
+    }
+
+    public DriverConfiguration withUsername(String username) {
+      return withUsername(ValueProvider.StaticValueProvider.of(username));
+    }
+
+    public DriverConfiguration withUsername(ValueProvider<String> username) {
+      Preconditions.checkArgument(username != null, "neo4j username can not be null", username);
+      Preconditions.checkArgument(
+          username.get() != null, "neo4j username can not be null", username);
+      return builder().setUsername(username).build();
+    }
+
+    public DriverConfiguration withPassword(String password) {
+      return withPassword(ValueProvider.StaticValueProvider.of(password));
+    }
+
+    public DriverConfiguration withPassword(ValueProvider<String> password) {
+      Preconditions.checkArgument(password != null, "neo4j password can not be null", password);
+      Preconditions.checkArgument(
+          password.get() != null, "neo4j password can not be null", password);
+      return builder().setPassword(password).build();
+    }
+
+    // Encryption
+    public DriverConfiguration withRouting() {
+      return builder().setRouting(ValueProvider.StaticValueProvider.of(Boolean.TRUE)).build();
+    }
+
+    public DriverConfiguration withoutRouting() {
+      return builder().setRouting(ValueProvider.StaticValueProvider.of(Boolean.FALSE)).build();
+    }
+
+    void populateDisplayData(DisplayData.Builder builder) {
+      builder.addIfNotNull(DisplayData.item("neo4j-url", getUrl()));
+      builder.addIfNotNull(DisplayData.item("neo4j-username", getUsername()));
+      builder.addIfNotNull(
+          DisplayData.item(
+              "neo4j-password", getPassword() != null ? "<provided>" : "<not-provided>"));
+      builder.addIfNotNull(
+          DisplayData.item(
+              "neo4j-encryption", getEncryption() != null ? "<provided>" : "<not-provided>"));
+    }
+
+    Driver buildDriver() {
+      // Create the Neo4j Driver
+      // The default is: have the driver make the determination
+      //
+      Config.ConfigBuilder configBuilder = Config.builder();

Review comment:
       Is there any reason we shouldn't let the user pass in their own `Config` directly, instead of duplicating all of its fields and building one for them?

##########
File path: sdks/java/io/neo4j/src/main/java/org/apache/beam/sdk/io/neo4j/Neo4jIO.java
##########
@@ -0,0 +1,1346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.neo4j;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.harness.JvmInitializer;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Logging;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.TransactionConfig;
+import org.neo4j.driver.TransactionWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a Beam IO to read from, and write data to, Neo4j.
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <h3>Driver configuration</h3>
+ *
+ * <p>To read from or write to Neo4j you have to provide a {@link DriverConfiguration} using<br>
+ * 1. {@link DriverConfiguration#create()} (which must be {@link Serializable});<br>
+ * 2. or {@link DriverConfiguration#create(String, String, String)} (URL, username and password).
+ *
+ * <p>If you have trouble connecting to a Neo4j Aura database please try to disable a few security
+ * algorithms in your JVM. This makes sure that the right one is picked to connect:
+ *
+ * <p>
+ *
+ * <pre>{@code
+ * Security.setProperty(
+ *       "jdk.tls.disabledAlgorithms",
+ *       "SSLv3, RC4, DES, MD5withRSA, DH keySize < 1024, EC keySize < 224, 3DES_EDE_CBC, anon, NULL");
+ * }</pre>
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <p>To execute this code on GCP Dataflow you can create a class which extends {@link
+ * JvmInitializer} and implement the {@link JvmInitializer#onStartup()} method. You need to annotate
+ * this new class with {@link com.google.auto.service.AutoService}
+ *
+ * <pre>{@code
+ * @AutoService(value = JvmInitializer.class)
+ * }</pre>
+ *
+ * <p>
+ *
+ * <h3>Reading from Neo4j</h3>
+ *
+ * <p>{@link Neo4jIO#readAll()} source returns a bounded collection of {@code OuptutT} as a {@code
+ * PCollection<OutputT>}. OutputT is the type returned by the provided {@link RowMapper}. It accepts
+ * parameters as input in the form of {@code ParameterT} as a {@code PCollection<ParameterT>}
+ *
+ * <p>The following example reads ages to return the IDs of Person nodes. It runs a Cypher query for
+ * each provided age.
+ *
+ * <p>The mapping {@link SerializableFunction} maps input values to each execution of the Cypher
+ * statement. In the function simply return a map containing the parameters you want to set.
+ *
+ * <p>The {@link RowMapper} converts output Neo4j {@link Record} values to the output of the source.
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(Create.of(40, 50, 60))
+ *   .apply(Neo4jIO.<Integer, String>readAll()
+ *     .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *     .withCypher("MATCH(n:Person) WHERE n.age = $age RETURN n.id")
+ *     .withReadTransaction()
+ *     .withCoder(StringUtf8Coder.of())
+ *     .withParametersFunction( age -> Collections.singletonMap( "age", age ))
+ *     .withRowMapper( record -> return record.get(0).asString() )
+ *   );
+ * }</pre>
+ *
+ * <h3>Writing to Neo4j</h3>
+ *
+ * <p>Neo4j sink supports writing data to a graph. It writes a {@link PCollection} to the graph by
+ * collecting a batch of rows after which all rows in this batch are written together to Neo4j. This
+ * is done using the {@link WriteUnwind} sink transform.
+ *
+ * <p>Like the source, to configure the sink, you have to provide a {@link DriverConfiguration}.
+ *
+ * <p>In the following example we'll merge a collection of {@link org.apache.beam.sdk.values.Row}
+ * into Person nodes. Since this is a Sink it has no output and as such no RowMapper is needed. The
+ * rows are being used as a container for the parameters of the Cypher statement. The used Cypher in
+ * question needs to be an UNWIND statement. Like in the read case, the parameters {@link
+ * SerializableFunction} converts parameter values to a {@link Map<String, Object>}. The difference
+ * here is that the resulting Map is stored in a {@link List} (containing maps) which in turn is
+ * stored in another Map under the name provided by the {@link
+ * WriteUnwind#withUnwindMapName(String)} method. All of this is handled automatically. You do need
+ * to provide the unwind map name so that you can reference that in the UNWIND statement.
+ *
+ * <p>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(...)
+ *   .apply(Neo4jIO.<Row>writeUnwind()
+ *      .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *      .withUnwindMapName("rows")
+ *      .withCypher("UNWIND $rows AS row MERGE(n:Person { id : row.id } ) SET n.firstName = row.first, n.lastName = row.last")
+ *      .withParametersFunction( row -> ImmutableMap.of(
+ *        "id", row.getString("id),
+ *        "first", row.getString("firstName")
+ *        "last", row.getString("lastName")))
+ *    );
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class Neo4jIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Neo4jIO.class);
+
+  /**
+   * Read all rows using a Neo4j Cypher query.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   * @param <OutputT> Type of the data to be read.
+   */
+  public static <ParameterT, OutputT> ReadAll<ParameterT, OutputT> readAll() {
+    return new AutoValue_Neo4jIO_ReadAll.Builder<ParameterT, OutputT>()
+        .setFetchSize(ValueProvider.StaticValueProvider.of(Config.defaultConfig().fetchSize()))
+        .build();
+  }
+
+  /**
+   * Write all rows using a Neo4j Cypher UNWIND cypher statement. This sets a default batch batch
+   * size of 5000.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   */
+  public static <ParameterT> WriteUnwind<ParameterT> writeUnwind() {
+    return new AutoValue_Neo4jIO_WriteUnwind.Builder<ParameterT>()
+        .setBatchSize(ValueProvider.StaticValueProvider.of(5000L))
+        .build();
+  }
+
+  private static <ParameterT, OutputT> PCollection<OutputT> getOutputPCollection(
+      PCollection<ParameterT> input, DoFn<ParameterT, OutputT> writeFn, Coder<OutputT> coder) {
+    PCollection<OutputT> output = input.apply(ParDo.of(writeFn)).setCoder(coder);
+
+    try {
+      TypeDescriptor<OutputT> typeDesc = coder.getEncodedTypeDescriptor();
+      SchemaRegistry registry = input.getPipeline().getSchemaRegistry();
+      Schema schema = registry.getSchema(typeDesc);
+      output.setSchema(
+          schema,
+          typeDesc,
+          registry.getToRowFunction(typeDesc),
+          registry.getFromRowFunction(typeDesc));
+    } catch (NoSuchSchemaException e) {
+      // ignore
+    }
+    return output;
+  }
+
+  /**
+   * An interface used by {@link ReadAll} for converting each row of a Neo4j {@link Result} record
+   * {@link Record} into an element of the resulting {@link PCollection}.
+   */
+  @FunctionalInterface
+  public interface RowMapper<T> extends Serializable {
+    T mapRow(Record record) throws Exception;
+  }
+
+  /** This describes all the information needed to create a Neo4j {@link Session}. */
+  @AutoValue
+  public abstract static class DriverConfiguration implements Serializable {
+    public static DriverConfiguration create() {
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder().build();
+    }
+
+    public static DriverConfiguration create(String url, String username, String password) {
+      checkArgument(url != null, "url can not be null");
+      checkArgument(username != null, "username can not be null");
+      checkArgument(password != null, "password can not be null");
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder()
+          .build()
+          .withUrl(url)
+          .withUsername(username)
+          .withPassword(password);
+    }
+
+    abstract @Nullable ValueProvider<String> getUrl();
+
+    abstract @Nullable ValueProvider<List<String>> getUrls();
+
+    abstract @Nullable ValueProvider<String> getUsername();
+
+    abstract @Nullable ValueProvider<String> getPassword();
+
+    abstract @Nullable ValueProvider<Boolean> getEncryption();
+
+    abstract @Nullable ValueProvider<Long> getConnectionLivenessCheckTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getMaxConnectionLifetimeMs();
+
+    abstract @Nullable ValueProvider<Integer> getMaxConnectionPoolSize();
+
+    abstract @Nullable ValueProvider<Long> getConnectionAcquisitionTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getConnectionTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getMaxTransactionRetryTimeMs();
+
+    abstract @Nullable ValueProvider<Boolean> getRouting();
+
+    abstract Builder builder();
+
+    // URL

Review comment:
       Nit: many of these 1-2 word comments only repeat what the code says. They can be removed.

##########
File path: sdks/java/io/neo4j/src/main/java/org/apache/beam/sdk/io/neo4j/Neo4jIO.java
##########
@@ -0,0 +1,1346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.neo4j;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.harness.JvmInitializer;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Logging;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.TransactionConfig;
+import org.neo4j.driver.TransactionWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a Beam IO to read from, and write data to, Neo4j.
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <h3>Driver configuration</h3>
+ *
+ * <p>To read from or write to Neo4j you have to provide a {@link DriverConfiguration} using<br>
+ * 1. {@link DriverConfiguration#create()} (which must be {@link Serializable});<br>
+ * 2. or {@link DriverConfiguration#create(String, String, String)} (URL, username and password).
+ *
+ * <p>If you have trouble connecting to a Neo4j Aura database please try to disable a few security
+ * algorithms in your JVM. This makes sure that the right one is picked to connect:
+ *
+ * <p>
+ *
+ * <pre>{@code
+ * Security.setProperty(
+ *       "jdk.tls.disabledAlgorithms",
+ *       "SSLv3, RC4, DES, MD5withRSA, DH keySize < 1024, EC keySize < 224, 3DES_EDE_CBC, anon, NULL");
+ * }</pre>
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <p>To execute this code on GCP Dataflow you can create a class which extends {@link
+ * JvmInitializer} and implement the {@link JvmInitializer#onStartup()} method. You need to annotate
+ * this new class with {@link com.google.auto.service.AutoService}
+ *
+ * <pre>{@code
+ * @AutoService(value = JvmInitializer.class)
+ * }</pre>
+ *
+ * <p>
+ *
+ * <h3>Reading from Neo4j</h3>
+ *
+ * <p>{@link Neo4jIO#readAll()} source returns a bounded collection of {@code OuptutT} as a {@code
+ * PCollection<OutputT>}. OutputT is the type returned by the provided {@link RowMapper}. It accepts
+ * parameters as input in the form of {@code ParameterT} as a {@code PCollection<ParameterT>}
+ *
+ * <p>The following example reads ages to return the IDs of Person nodes. It runs a Cypher query for
+ * each provided age.
+ *
+ * <p>The mapping {@link SerializableFunction} maps input values to each execution of the Cypher
+ * statement. In the function simply return a map containing the parameters you want to set.
+ *
+ * <p>The {@link RowMapper} converts output Neo4j {@link Record} values to the output of the source.
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(Create.of(40, 50, 60))
+ *   .apply(Neo4jIO.<Integer, String>readAll()
+ *     .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *     .withCypher("MATCH(n:Person) WHERE n.age = $age RETURN n.id")
+ *     .withReadTransaction()
+ *     .withCoder(StringUtf8Coder.of())
+ *     .withParametersFunction( age -> Collections.singletonMap( "age", age ))
+ *     .withRowMapper( record -> return record.get(0).asString() )
+ *   );
+ * }</pre>
+ *
+ * <h3>Writing to Neo4j</h3>
+ *
+ * <p>Neo4j sink supports writing data to a graph. It writes a {@link PCollection} to the graph by
+ * collecting a batch of rows after which all rows in this batch are written together to Neo4j. This
+ * is done using the {@link WriteUnwind} sink transform.
+ *
+ * <p>Like the source, to configure the sink, you have to provide a {@link DriverConfiguration}.
+ *
+ * <p>In the following example we'll merge a collection of {@link org.apache.beam.sdk.values.Row}
+ * into Person nodes. Since this is a Sink it has no output and as such no RowMapper is needed. The
+ * rows are being used as a container for the parameters of the Cypher statement. The used Cypher in
+ * question needs to be an UNWIND statement. Like in the read case, the parameters {@link
+ * SerializableFunction} converts parameter values to a {@link Map<String, Object>}. The difference
+ * here is that the resulting Map is stored in a {@link List} (containing maps) which in turn is
+ * stored in another Map under the name provided by the {@link
+ * WriteUnwind#withUnwindMapName(String)} method. All of this is handled automatically. You do need
+ * to provide the unwind map name so that you can reference that in the UNWIND statement.
+ *
+ * <p>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(...)
+ *   .apply(Neo4jIO.<Row>writeUnwind()
+ *      .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *      .withUnwindMapName("rows")
+ *      .withCypher("UNWIND $rows AS row MERGE(n:Person { id : row.id } ) SET n.firstName = row.first, n.lastName = row.last")
+ *      .withParametersFunction( row -> ImmutableMap.of(
+ *        "id", row.getString("id),

Review comment:
       ```suggestion
    *        "id", row.getString("id"),
   ```

##########
File path: sdks/java/io/neo4j/src/test/java/org/apache/beam/sdk/io/neo4j/Neo4jIOTest.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.neo4j;
+
+import java.util.Arrays;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class Neo4jIOTest {
+
+  @Test
+  public void testDriverConfigurationCreate() throws Exception {
+    Neo4jIO.DriverConfiguration driverConfiguration =
+        Neo4jIO.DriverConfiguration.create("someUrl", "username", "password");
+    Assert.assertEquals("someUrl", driverConfiguration.getUrl().get());
+    Assert.assertEquals("username", driverConfiguration.getUsername().get());
+    Assert.assertEquals("password", driverConfiguration.getPassword().get());
+  }
+
+  @Test
+  public void testDriverConfigurationWith() throws Exception {
+    Neo4jIO.DriverConfiguration driverConfiguration = Neo4jIO.DriverConfiguration.create();
+
+    driverConfiguration = driverConfiguration.withEncryption();
+    Assert.assertEquals(true, driverConfiguration.getEncryption().get());
+
+    driverConfiguration = driverConfiguration.withoutEncryption();
+    Assert.assertEquals(false, driverConfiguration.getEncryption().get());
+
+    driverConfiguration = driverConfiguration.withUrl("url1");
+    Assert.assertEquals("url1", driverConfiguration.getUrl().get());
+
+    // URL and URLs can be set independently but are both used
+    driverConfiguration = driverConfiguration.withUrls(Arrays.asList("url2", "url3", "url4"));
+    Assert.assertEquals(3, driverConfiguration.getUrls().get().size());
+
+    driverConfiguration = driverConfiguration.withUsername("username");
+    Assert.assertEquals("username", driverConfiguration.getUsername().get());
+
+    driverConfiguration = driverConfiguration.withPassword("password");
+    Assert.assertEquals("password", driverConfiguration.getPassword().get());
+
+    driverConfiguration = driverConfiguration.withRouting();
+    Assert.assertEquals(true, driverConfiguration.getRouting().get());
+
+    driverConfiguration = driverConfiguration.withoutRouting();
+    Assert.assertEquals(false, driverConfiguration.getRouting().get());
+
+    driverConfiguration = driverConfiguration.withConnectionAcquisitionTimeoutMs(54321L);
+    Assert.assertEquals(
+        54321L, (long) driverConfiguration.getConnectionAcquisitionTimeoutMs().get());
+
+    driverConfiguration = driverConfiguration.withConnectionTimeoutMs(43210L);
+    Assert.assertEquals(43210L, (long) driverConfiguration.getConnectionTimeoutMs().get());
+
+    driverConfiguration = driverConfiguration.withConnectionLivenessCheckTimeoutMs(32109L);
+    Assert.assertEquals(
+        32109L, (long) driverConfiguration.getConnectionLivenessCheckTimeoutMs().get());
+
+    driverConfiguration = driverConfiguration.withMaxConnectionLifetimeMs(21098L);
+    Assert.assertEquals(21098L, (long) driverConfiguration.getMaxConnectionLifetimeMs().get());
+
+    driverConfiguration = driverConfiguration.withMaxTransactionRetryTimeMs(10987L);
+    Assert.assertEquals(10987L, (long) driverConfiguration.getMaxTransactionRetryTimeMs().get());
+
+    driverConfiguration = driverConfiguration.withMaxConnectionPoolSize(101);
+    Assert.assertEquals(101, (int) driverConfiguration.getMaxConnectionPoolSize().get());
+  }
+
+  @Test
+  public void testDriverConfigurationErrors() throws Exception {
+    Neo4jIO.DriverConfiguration driverConfiguration = Neo4jIO.DriverConfiguration.create();
+
+    try {

Review comment:
       Instead of try/Assert/catch, use ExpectedException: https://junit.org/junit4/javadoc/4.13/org/junit/rules/ExpectedException.html

##########
File path: sdks/java/io/neo4j/src/main/java/org/apache/beam/sdk/io/neo4j/Neo4jIO.java
##########
@@ -0,0 +1,1346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.neo4j;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.harness.JvmInitializer;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Logging;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.TransactionConfig;
+import org.neo4j.driver.TransactionWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a Beam IO to read from, and write data to, Neo4j.
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <h3>Driver configuration</h3>
+ *
+ * <p>To read from or write to Neo4j you have to provide a {@link DriverConfiguration} using<br>
+ * 1. {@link DriverConfiguration#create()} (which must be {@link Serializable});<br>
+ * 2. or {@link DriverConfiguration#create(String, String, String)} (URL, username and password).
+ *
+ * <p>If you have trouble connecting to a Neo4j Aura database please try to disable a few security
+ * algorithms in your JVM. This makes sure that the right one is picked to connect:
+ *
+ * <p>
+ *
+ * <pre>{@code
+ * Security.setProperty(
+ *       "jdk.tls.disabledAlgorithms",
+ *       "SSLv3, RC4, DES, MD5withRSA, DH keySize < 1024, EC keySize < 224, 3DES_EDE_CBC, anon, NULL");
+ * }</pre>
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <p>To execute this code on GCP Dataflow you can create a class which extends {@link
+ * JvmInitializer} and implement the {@link JvmInitializer#onStartup()} method. You need to annotate
+ * this new class with {@link com.google.auto.service.AutoService}
+ *
+ * <pre>{@code
+ * @AutoService(value = JvmInitializer.class)
+ * }</pre>
+ *
+ * <p>
+ *
+ * <h3>Reading from Neo4j</h3>
+ *
+ * <p>{@link Neo4jIO#readAll()} source returns a bounded collection of {@code OuptutT} as a {@code
+ * PCollection<OutputT>}. OutputT is the type returned by the provided {@link RowMapper}. It accepts
+ * parameters as input in the form of {@code ParameterT} as a {@code PCollection<ParameterT>}
+ *
+ * <p>The following example reads ages to return the IDs of Person nodes. It runs a Cypher query for
+ * each provided age.
+ *
+ * <p>The mapping {@link SerializableFunction} maps input values to each execution of the Cypher
+ * statement. In the function simply return a map containing the parameters you want to set.
+ *
+ * <p>The {@link RowMapper} converts output Neo4j {@link Record} values to the output of the source.
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(Create.of(40, 50, 60))
+ *   .apply(Neo4jIO.<Integer, String>readAll()
+ *     .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *     .withCypher("MATCH(n:Person) WHERE n.age = $age RETURN n.id")
+ *     .withReadTransaction()
+ *     .withCoder(StringUtf8Coder.of())
+ *     .withParametersFunction( age -> Collections.singletonMap( "age", age ))
+ *     .withRowMapper( record -> return record.get(0).asString() )
+ *   );
+ * }</pre>
+ *
+ * <h3>Writing to Neo4j</h3>
+ *
+ * <p>Neo4j sink supports writing data to a graph. It writes a {@link PCollection} to the graph by
+ * collecting a batch of rows after which all rows in this batch are written together to Neo4j. This
+ * is done using the {@link WriteUnwind} sink transform.
+ *
+ * <p>Like the source, to configure the sink, you have to provide a {@link DriverConfiguration}.
+ *
+ * <p>In the following example we'll merge a collection of {@link org.apache.beam.sdk.values.Row}
+ * into Person nodes. Since this is a Sink it has no output and as such no RowMapper is needed. The
+ * rows are being used as a container for the parameters of the Cypher statement. The used Cypher in
+ * question needs to be an UNWIND statement. Like in the read case, the parameters {@link
+ * SerializableFunction} converts parameter values to a {@link Map<String, Object>}. The difference
+ * here is that the resulting Map is stored in a {@link List} (containing maps) which in turn is
+ * stored in another Map under the name provided by the {@link
+ * WriteUnwind#withUnwindMapName(String)} method. All of this is handled automatically. You do need
+ * to provide the unwind map name so that you can reference that in the UNWIND statement.
+ *
+ * <p>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(...)
+ *   .apply(Neo4jIO.<Row>writeUnwind()
+ *      .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *      .withUnwindMapName("rows")
+ *      .withCypher("UNWIND $rows AS row MERGE(n:Person { id : row.id } ) SET n.firstName = row.first, n.lastName = row.last")
+ *      .withParametersFunction( row -> ImmutableMap.of(
+ *        "id", row.getString("id),
+ *        "first", row.getString("firstName")
+ *        "last", row.getString("lastName")))
+ *    );
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class Neo4jIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Neo4jIO.class);
+
+  /**
+   * Read all rows using a Neo4j Cypher query.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   * @param <OutputT> Type of the data to be read.
+   */
+  public static <ParameterT, OutputT> ReadAll<ParameterT, OutputT> readAll() {
+    return new AutoValue_Neo4jIO_ReadAll.Builder<ParameterT, OutputT>()
+        .setFetchSize(ValueProvider.StaticValueProvider.of(Config.defaultConfig().fetchSize()))
+        .build();
+  }
+
+  /**
+   * Write all rows using a Neo4j Cypher UNWIND cypher statement. This sets a default batch batch
+   * size of 5000.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   */
+  public static <ParameterT> WriteUnwind<ParameterT> writeUnwind() {
+    return new AutoValue_Neo4jIO_WriteUnwind.Builder<ParameterT>()
+        .setBatchSize(ValueProvider.StaticValueProvider.of(5000L))
+        .build();
+  }
+
+  private static <ParameterT, OutputT> PCollection<OutputT> getOutputPCollection(
+      PCollection<ParameterT> input, DoFn<ParameterT, OutputT> writeFn, Coder<OutputT> coder) {
+    PCollection<OutputT> output = input.apply(ParDo.of(writeFn)).setCoder(coder);
+
+    try {
+      TypeDescriptor<OutputT> typeDesc = coder.getEncodedTypeDescriptor();
+      SchemaRegistry registry = input.getPipeline().getSchemaRegistry();
+      Schema schema = registry.getSchema(typeDesc);
+      output.setSchema(
+          schema,
+          typeDesc,
+          registry.getToRowFunction(typeDesc),
+          registry.getFromRowFunction(typeDesc));
+    } catch (NoSuchSchemaException e) {
+      // ignore
+    }
+    return output;
+  }
+
+  /**
+   * An interface used by {@link ReadAll} for converting each row of a Neo4j {@link Result} record
+   * {@link Record} into an element of the resulting {@link PCollection}.
+   */
+  @FunctionalInterface
+  public interface RowMapper<T> extends Serializable {
+    T mapRow(Record record) throws Exception;
+  }
+
+  /** This describes all the information needed to create a Neo4j {@link Session}. */
+  @AutoValue
+  public abstract static class DriverConfiguration implements Serializable {
+    public static DriverConfiguration create() {
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder().build();
+    }
+
+    public static DriverConfiguration create(String url, String username, String password) {
+      checkArgument(url != null, "url can not be null");
+      checkArgument(username != null, "username can not be null");
+      checkArgument(password != null, "password can not be null");
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder()
+          .build()
+          .withUrl(url)
+          .withUsername(username)
+          .withPassword(password);
+    }
+
+    abstract @Nullable ValueProvider<String> getUrl();

Review comment:
       It's confusing to have two fields that are merged into one. Can we consolidate them?
   
   - Remove `getUrl`
   - Have a single `withUrls` method with varargs (`withUrls(String urls...)`) 

##########
File path: sdks/java/io/neo4j/src/main/java/org/apache/beam/sdk/io/neo4j/Neo4jIO.java
##########
@@ -0,0 +1,1346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.neo4j;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.harness.JvmInitializer;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Logging;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.TransactionConfig;
+import org.neo4j.driver.TransactionWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a Beam IO to read from, and write data to, Neo4j.
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <h3>Driver configuration</h3>
+ *
+ * <p>To read from or write to Neo4j you have to provide a {@link DriverConfiguration} using<br>
+ * 1. {@link DriverConfiguration#create()} (which must be {@link Serializable});<br>
+ * 2. or {@link DriverConfiguration#create(String, String, String)} (URL, username and password).
+ *
+ * <p>If you have trouble connecting to a Neo4j Aura database please try to disable a few security
+ * algorithms in your JVM. This makes sure that the right one is picked to connect:
+ *
+ * <p>
+ *
+ * <pre>{@code
+ * Security.setProperty(
+ *       "jdk.tls.disabledAlgorithms",
+ *       "SSLv3, RC4, DES, MD5withRSA, DH keySize < 1024, EC keySize < 224, 3DES_EDE_CBC, anon, NULL");
+ * }</pre>
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <p>To execute this code on GCP Dataflow you can create a class which extends {@link
+ * JvmInitializer} and implement the {@link JvmInitializer#onStartup()} method. You need to annotate
+ * this new class with {@link com.google.auto.service.AutoService}
+ *
+ * <pre>{@code
+ * @AutoService(value = JvmInitializer.class)
+ * }</pre>
+ *
+ * <p>
+ *
+ * <h3>Reading from Neo4j</h3>
+ *
+ * <p>{@link Neo4jIO#readAll()} source returns a bounded collection of {@code OuptutT} as a {@code
+ * PCollection<OutputT>}. OutputT is the type returned by the provided {@link RowMapper}. It accepts
+ * parameters as input in the form of {@code ParameterT} as a {@code PCollection<ParameterT>}
+ *
+ * <p>The following example reads ages to return the IDs of Person nodes. It runs a Cypher query for
+ * each provided age.
+ *
+ * <p>The mapping {@link SerializableFunction} maps input values to each execution of the Cypher
+ * statement. In the function simply return a map containing the parameters you want to set.
+ *
+ * <p>The {@link RowMapper} converts output Neo4j {@link Record} values to the output of the source.
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(Create.of(40, 50, 60))
+ *   .apply(Neo4jIO.<Integer, String>readAll()
+ *     .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *     .withCypher("MATCH(n:Person) WHERE n.age = $age RETURN n.id")
+ *     .withReadTransaction()
+ *     .withCoder(StringUtf8Coder.of())
+ *     .withParametersFunction( age -> Collections.singletonMap( "age", age ))
+ *     .withRowMapper( record -> return record.get(0).asString() )
+ *   );
+ * }</pre>
+ *
+ * <h3>Writing to Neo4j</h3>
+ *
+ * <p>Neo4j sink supports writing data to a graph. It writes a {@link PCollection} to the graph by
+ * collecting a batch of rows after which all rows in this batch are written together to Neo4j. This
+ * is done using the {@link WriteUnwind} sink transform.
+ *
+ * <p>Like the source, to configure the sink, you have to provide a {@link DriverConfiguration}.
+ *
+ * <p>In the following example we'll merge a collection of {@link org.apache.beam.sdk.values.Row}
+ * into Person nodes. Since this is a Sink it has no output and as such no RowMapper is needed. The
+ * rows are being used as a container for the parameters of the Cypher statement. The used Cypher in
+ * question needs to be an UNWIND statement. Like in the read case, the parameters {@link
+ * SerializableFunction} converts parameter values to a {@link Map<String, Object>}. The difference
+ * here is that the resulting Map is stored in a {@link List} (containing maps) which in turn is
+ * stored in another Map under the name provided by the {@link
+ * WriteUnwind#withUnwindMapName(String)} method. All of this is handled automatically. You do need
+ * to provide the unwind map name so that you can reference that in the UNWIND statement.
+ *
+ * <p>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(...)
+ *   .apply(Neo4jIO.<Row>writeUnwind()
+ *      .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *      .withUnwindMapName("rows")
+ *      .withCypher("UNWIND $rows AS row MERGE(n:Person { id : row.id } ) SET n.firstName = row.first, n.lastName = row.last")
+ *      .withParametersFunction( row -> ImmutableMap.of(
+ *        "id", row.getString("id),
+ *        "first", row.getString("firstName")
+ *        "last", row.getString("lastName")))
+ *    );
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class Neo4jIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Neo4jIO.class);
+
+  /**
+   * Read all rows using a Neo4j Cypher query.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   * @param <OutputT> Type of the data to be read.
+   */
+  public static <ParameterT, OutputT> ReadAll<ParameterT, OutputT> readAll() {
+    return new AutoValue_Neo4jIO_ReadAll.Builder<ParameterT, OutputT>()
+        .setFetchSize(ValueProvider.StaticValueProvider.of(Config.defaultConfig().fetchSize()))
+        .build();
+  }
+
+  /**
+   * Write all rows using a Neo4j Cypher UNWIND cypher statement. This sets a default batch batch
+   * size of 5000.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   */
+  public static <ParameterT> WriteUnwind<ParameterT> writeUnwind() {
+    return new AutoValue_Neo4jIO_WriteUnwind.Builder<ParameterT>()
+        .setBatchSize(ValueProvider.StaticValueProvider.of(5000L))
+        .build();
+  }
+
+  private static <ParameterT, OutputT> PCollection<OutputT> getOutputPCollection(
+      PCollection<ParameterT> input, DoFn<ParameterT, OutputT> writeFn, Coder<OutputT> coder) {
+    PCollection<OutputT> output = input.apply(ParDo.of(writeFn)).setCoder(coder);
+
+    try {
+      TypeDescriptor<OutputT> typeDesc = coder.getEncodedTypeDescriptor();
+      SchemaRegistry registry = input.getPipeline().getSchemaRegistry();
+      Schema schema = registry.getSchema(typeDesc);
+      output.setSchema(
+          schema,
+          typeDesc,
+          registry.getToRowFunction(typeDesc),
+          registry.getFromRowFunction(typeDesc));
+    } catch (NoSuchSchemaException e) {
+      // ignore
+    }
+    return output;
+  }
+
+  /**
+   * An interface used by {@link ReadAll} for converting each row of a Neo4j {@link Result} record
+   * {@link Record} into an element of the resulting {@link PCollection}.
+   */
+  @FunctionalInterface
+  public interface RowMapper<T> extends Serializable {
+    T mapRow(Record record) throws Exception;
+  }
+
+  /** This describes all the information needed to create a Neo4j {@link Session}. */
+  @AutoValue
+  public abstract static class DriverConfiguration implements Serializable {
+    public static DriverConfiguration create() {
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder().build();
+    }
+
+    public static DriverConfiguration create(String url, String username, String password) {
+      checkArgument(url != null, "url can not be null");
+      checkArgument(username != null, "username can not be null");
+      checkArgument(password != null, "password can not be null");
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder()
+          .build()
+          .withUrl(url)
+          .withUsername(username)
+          .withPassword(password);
+    }
+
+    abstract @Nullable ValueProvider<String> getUrl();

Review comment:
       FYI: ValueProvider mostly exists to serve Dataflow ["classic" templates](https://cloud.google.com/dataflow/docs/guides/templates/creating-templates), which are still supported, but usually we encourage folks to use [flex templates](https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates) instead. ValueProviders exist because in classic templates, there is a gap between pipeline construction time and pipeline runtime. In flex templates, there is no real gap because pipelines are constructed and run on demand.
   
   So td;dr while ValueProviders may still be useful for some folks, I'll leave it up to your discretion which (if any) fields you want to be ValueProviders, and which are fine as regular values. 

##########
File path: sdks/java/io/neo4j/OWNERS
##########
@@ -0,0 +1,4 @@
+# See the OWNERS docs at https://s.apache.org/beam-owners
+
+reviewers:
+

Review comment:
       Can you add yourself as a reviewer here? Or you can delete this file entirely, since our existing OWNERS files are mostly unused and out of date.

##########
File path: sdks/java/io/neo4j/src/test/java/org/apache/beam/sdk/io/neo4j/RowToLineFn.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.neo4j;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * Test class not put into another class to prevent serialization errors of the parent class object.

Review comment:
       What if we make RowToLineFn a static subclass of Neo4jIOIT?

##########
File path: sdks/java/io/neo4j/src/test/java/org/apache/beam/sdk/io/neo4j/Neo4jIOIT.java
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.neo4j;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.testcontainers.containers.Neo4jContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
+
+@RunWith(JUnit4.class)
+public class Neo4jIOIT {
+
+  private static Network network;
+  private static Neo4jContainer<?> neo4jContainer;
+
+  @Rule public transient TestPipeline parameterizedReadPipeline = TestPipeline.create();
+  @Rule public transient TestPipeline writeUnwindPipeline = TestPipeline.create();
+  @Rule public transient TestPipeline largeWriteUnwindPipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    // network sharing doesn't work with ClassRule
+    network = Network.newNetwork();
+
+    neo4jContainer =
+        new Neo4jContainer<>(DockerImageName.parse("neo4j").withTag(Neo4jTestUtil.NEO4J_VERSION))
+            .withStartupAttempts(1)
+            .withExposedPorts(7687, 7474)
+            .withNetwork(network)
+            .withEnv(
+                "NEO4J_AUTH", Neo4jTestUtil.NEO4J_USERNAME + "/" + Neo4jTestUtil.NEO4J_PASSWORD)
+            .withEnv("NEO4J_dbms_default_listen_address", "0.0.0.0")
+            .withNetworkAliases(Neo4jTestUtil.NEO4J_HOSTNAME)
+            .withSharedMemorySize(256 * 1024 * 1024L); // 256MB
+
+    // Start Neo4j
+    neo4jContainer.start();
+
+    // Start with an empty database to use for testing.
+    // This prevents any possibility of some old data messing up the test results.
+    // We add a unique constraint to see we're not trying to create nodes twice in the larger test
+    // below
+    //
+    Neo4jTestUtil.executeOnNeo4j(
+        "CREATE OR REPLACE DATABASE " + Neo4jTestUtil.NEO4J_DATABASE, false);
+    Neo4jTestUtil.executeOnNeo4j(
+        "CREATE CONSTRAINT something_id_unique ON (n:Something) ASSERT n.id IS UNIQUE", true);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    neo4jContainer.stop();
+    neo4jContainer.close();
+
+    try {
+      network.close();
+    } catch (Exception e) {
+      // ignore
+    }
+  }
+
+  @Test
+  public void testParameterizedRead() throws Exception {
+    PBegin begin = parameterizedReadPipeline.begin();
+    PCollection<String> stringsCollections =
+        begin.apply(Create.of(Arrays.asList("one", "two", "three")));
+
+    final Schema outputSchema =
+        Schema.of(
+            Schema.Field.of("One", Schema.FieldType.INT32),
+            Schema.Field.of("Str", Schema.FieldType.STRING));
+
+    SerializableFunction<String, Map<String, Object>> parametersFunction =
+        string -> Collections.singletonMap("par1", string);
+
+    Neo4jIO.RowMapper<Row> rowMapper =
+        record -> {
+          int one = record.get(0).asInt();
+          String string = record.get(1).asString();
+          return Row.withSchema(outputSchema).attachValues(one, string);
+        };
+
+    Neo4jIO.ReadAll<String, Row> read =
+        Neo4jIO.<String, Row>readAll()
+            .withCypher("RETURN 1, $par1")
+            .withDatabase(Neo4jTestUtil.NEO4J_DATABASE)
+            .withDriverConfiguration(Neo4jTestUtil.getDriverConfiguration())
+            .withReadTransaction()
+            .withFetchSize(5000)
+            .withRowMapper(rowMapper)
+            .withParametersFunction(parametersFunction)
+            .withCoder(SerializableCoder.of(Row.class))
+            .withCypherLogging();
+
+    PCollection<Row> outputRows = stringsCollections.apply(read);
+
+    PCollection<String> outputLines = outputRows.apply(ParDo.of(new RowToLineFn()));
+
+    PAssert.that(outputLines).containsInAnyOrder("1,one", "1,two", "1,three");
+
+    // Now run this pipeline
+    //
+    PipelineResult pipelineResult = parameterizedReadPipeline.run();
+
+    Assert.assertEquals(PipelineResult.State.DONE, pipelineResult.getState());
+  }
+
+  @Test
+  public void testWriteUnwind() throws Exception {
+    PBegin begin = writeUnwindPipeline.begin();
+    PCollection<String> stringsCollections =
+        begin.apply(Create.of(Arrays.asList("one", "two", "three")));
+
+    // Every row is represented by a Map<String, Object> in the parameters map.
+    // We accumulate the rows and 'unwind' those to Neo4j for performance reasons.
+    //
+    SerializableFunction<String, Map<String, Object>> parametersMapper =
+        name -> Collections.singletonMap("name", name);
+
+    Neo4jIO.WriteUnwind<String> read =
+        Neo4jIO.<String>writeUnwind()
+            .withDriverConfiguration(Neo4jTestUtil.getDriverConfiguration())
+            .withDatabase(Neo4jTestUtil.NEO4J_DATABASE)
+            .withBatchSize(5000)
+            .withUnwindMapName("rows")
+            .withCypher("UNWIND $rows AS row MERGE(n:Num { name : row.name })")
+            .withParametersFunction(parametersMapper)
+            .withCypherLogging();
+
+    stringsCollections.apply(read);
+
+    // Now run this pipeline
+    //
+    PipelineResult pipelineResult = writeUnwindPipeline.run();
+
+    Assert.assertEquals(PipelineResult.State.DONE, pipelineResult.getState());
+
+    // Connect back to the Instance and verify that we have 3 nodes
+    //
+    try (Driver driver = Neo4jTestUtil.getDriver()) {
+      try (Session session = Neo4jTestUtil.getSession(driver, true)) {
+        List<String> names =
+            session.readTransaction(
+                tx -> {
+                  List<String> list = new ArrayList<>();
+                  Result result = tx.run("MATCH(n:Num) RETURN n.name");
+                  while (result.hasNext()) {
+                    Record record = result.next();
+                    list.add(record.get(0).asString());
+                  }
+                  return list;
+                });
+        Assert.assertEquals(3, names.size());

Review comment:
       Nit: you can simplify this to one call with 
   
   ```java
   assertThat(names, containsInAnyOrder("one", "two", "three"));
   ```

##########
File path: sdks/java/io/neo4j/src/main/java/org/apache/beam/sdk/io/neo4j/Neo4jIO.java
##########
@@ -0,0 +1,1346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.neo4j;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.harness.JvmInitializer;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Logging;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.TransactionConfig;
+import org.neo4j.driver.TransactionWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a Beam IO to read from, and write data to, Neo4j.
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <h3>Driver configuration</h3>
+ *
+ * <p>To read from or write to Neo4j you have to provide a {@link DriverConfiguration} using<br>
+ * 1. {@link DriverConfiguration#create()} (which must be {@link Serializable});<br>
+ * 2. or {@link DriverConfiguration#create(String, String, String)} (URL, username and password).
+ *
+ * <p>If you have trouble connecting to a Neo4j Aura database please try to disable a few security
+ * algorithms in your JVM. This makes sure that the right one is picked to connect:
+ *
+ * <p>
+ *
+ * <pre>{@code
+ * Security.setProperty(
+ *       "jdk.tls.disabledAlgorithms",
+ *       "SSLv3, RC4, DES, MD5withRSA, DH keySize < 1024, EC keySize < 224, 3DES_EDE_CBC, anon, NULL");
+ * }</pre>
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <p>To execute this code on GCP Dataflow you can create a class which extends {@link
+ * JvmInitializer} and implement the {@link JvmInitializer#onStartup()} method. You need to annotate
+ * this new class with {@link com.google.auto.service.AutoService}
+ *
+ * <pre>{@code
+ * @AutoService(value = JvmInitializer.class)
+ * }</pre>
+ *
+ * <p>
+ *
+ * <h3>Reading from Neo4j</h3>
+ *
+ * <p>{@link Neo4jIO#readAll()} source returns a bounded collection of {@code OuptutT} as a {@code
+ * PCollection<OutputT>}. OutputT is the type returned by the provided {@link RowMapper}. It accepts
+ * parameters as input in the form of {@code ParameterT} as a {@code PCollection<ParameterT>}
+ *
+ * <p>The following example reads ages to return the IDs of Person nodes. It runs a Cypher query for
+ * each provided age.
+ *
+ * <p>The mapping {@link SerializableFunction} maps input values to each execution of the Cypher
+ * statement. In the function simply return a map containing the parameters you want to set.
+ *
+ * <p>The {@link RowMapper} converts output Neo4j {@link Record} values to the output of the source.
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(Create.of(40, 50, 60))
+ *   .apply(Neo4jIO.<Integer, String>readAll()
+ *     .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *     .withCypher("MATCH(n:Person) WHERE n.age = $age RETURN n.id")
+ *     .withReadTransaction()
+ *     .withCoder(StringUtf8Coder.of())
+ *     .withParametersFunction( age -> Collections.singletonMap( "age", age ))
+ *     .withRowMapper( record -> return record.get(0).asString() )
+ *   );
+ * }</pre>
+ *
+ * <h3>Writing to Neo4j</h3>
+ *
+ * <p>Neo4j sink supports writing data to a graph. It writes a {@link PCollection} to the graph by
+ * collecting a batch of rows after which all rows in this batch are written together to Neo4j. This
+ * is done using the {@link WriteUnwind} sink transform.
+ *
+ * <p>Like the source, to configure the sink, you have to provide a {@link DriverConfiguration}.
+ *
+ * <p>In the following example we'll merge a collection of {@link org.apache.beam.sdk.values.Row}
+ * into Person nodes. Since this is a Sink it has no output and as such no RowMapper is needed. The
+ * rows are being used as a container for the parameters of the Cypher statement. The used Cypher in
+ * question needs to be an UNWIND statement. Like in the read case, the parameters {@link
+ * SerializableFunction} converts parameter values to a {@link Map<String, Object>}. The difference
+ * here is that the resulting Map is stored in a {@link List} (containing maps) which in turn is
+ * stored in another Map under the name provided by the {@link
+ * WriteUnwind#withUnwindMapName(String)} method. All of this is handled automatically. You do need
+ * to provide the unwind map name so that you can reference that in the UNWIND statement.
+ *
+ * <p>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(...)
+ *   .apply(Neo4jIO.<Row>writeUnwind()
+ *      .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *      .withUnwindMapName("rows")
+ *      .withCypher("UNWIND $rows AS row MERGE(n:Person { id : row.id } ) SET n.firstName = row.first, n.lastName = row.last")
+ *      .withParametersFunction( row -> ImmutableMap.of(
+ *        "id", row.getString("id),
+ *        "first", row.getString("firstName")
+ *        "last", row.getString("lastName")))
+ *    );
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class Neo4jIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Neo4jIO.class);
+
+  /**
+   * Read all rows using a Neo4j Cypher query.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   * @param <OutputT> Type of the data to be read.
+   */
+  public static <ParameterT, OutputT> ReadAll<ParameterT, OutputT> readAll() {
+    return new AutoValue_Neo4jIO_ReadAll.Builder<ParameterT, OutputT>()
+        .setFetchSize(ValueProvider.StaticValueProvider.of(Config.defaultConfig().fetchSize()))
+        .build();
+  }
+
+  /**
+   * Write all rows using a Neo4j Cypher UNWIND cypher statement. This sets a default batch batch
+   * size of 5000.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   */
+  public static <ParameterT> WriteUnwind<ParameterT> writeUnwind() {
+    return new AutoValue_Neo4jIO_WriteUnwind.Builder<ParameterT>()
+        .setBatchSize(ValueProvider.StaticValueProvider.of(5000L))
+        .build();
+  }
+
+  private static <ParameterT, OutputT> PCollection<OutputT> getOutputPCollection(
+      PCollection<ParameterT> input, DoFn<ParameterT, OutputT> writeFn, Coder<OutputT> coder) {
+    PCollection<OutputT> output = input.apply(ParDo.of(writeFn)).setCoder(coder);
+
+    try {
+      TypeDescriptor<OutputT> typeDesc = coder.getEncodedTypeDescriptor();
+      SchemaRegistry registry = input.getPipeline().getSchemaRegistry();
+      Schema schema = registry.getSchema(typeDesc);
+      output.setSchema(
+          schema,
+          typeDesc,
+          registry.getToRowFunction(typeDesc),
+          registry.getFromRowFunction(typeDesc));
+    } catch (NoSuchSchemaException e) {
+      // ignore
+    }
+    return output;
+  }
+
+  /**
+   * An interface used by {@link ReadAll} for converting each row of a Neo4j {@link Result} record
+   * {@link Record} into an element of the resulting {@link PCollection}.
+   */
+  @FunctionalInterface
+  public interface RowMapper<T> extends Serializable {
+    T mapRow(Record record) throws Exception;
+  }
+
+  /** This describes all the information needed to create a Neo4j {@link Session}. */
+  @AutoValue
+  public abstract static class DriverConfiguration implements Serializable {
+    public static DriverConfiguration create() {
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder().build();
+    }
+
+    public static DriverConfiguration create(String url, String username, String password) {
+      checkArgument(url != null, "url can not be null");
+      checkArgument(username != null, "username can not be null");
+      checkArgument(password != null, "password can not be null");
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder()
+          .build()
+          .withUrl(url)
+          .withUsername(username)
+          .withPassword(password);
+    }
+
+    abstract @Nullable ValueProvider<String> getUrl();
+
+    abstract @Nullable ValueProvider<List<String>> getUrls();
+
+    abstract @Nullable ValueProvider<String> getUsername();
+
+    abstract @Nullable ValueProvider<String> getPassword();
+
+    abstract @Nullable ValueProvider<Boolean> getEncryption();
+
+    abstract @Nullable ValueProvider<Long> getConnectionLivenessCheckTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getMaxConnectionLifetimeMs();
+
+    abstract @Nullable ValueProvider<Integer> getMaxConnectionPoolSize();
+
+    abstract @Nullable ValueProvider<Long> getConnectionAcquisitionTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getConnectionTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getMaxTransactionRetryTimeMs();
+
+    abstract @Nullable ValueProvider<Boolean> getRouting();
+
+    abstract Builder builder();
+
+    // URL
+    public DriverConfiguration withUrl(String url) {
+      return withUrl(ValueProvider.StaticValueProvider.of(url));
+    }
+
+    public DriverConfiguration withUrl(ValueProvider<String> url) {
+      Preconditions.checkArgument(
+          url != null, "a neo4j connection URL can not be empty or null", url);
+      Preconditions.checkArgument(
+          StringUtils.isNotEmpty(url.get()),
+          "a neo4j connection URL can not be empty or null",
+          url);
+      return builder().setUrl(url).build();
+    }
+
+    // URLS
+    public DriverConfiguration withUrls(List<String> urls) {
+      return withUrls(ValueProvider.StaticValueProvider.of(urls));
+    }
+
+    public DriverConfiguration withUrls(ValueProvider<List<String>> urls) {
+      Preconditions.checkArgument(
+          urls != null, "a list of neo4j connection URLs can not be empty or null", urls);
+      Preconditions.checkArgument(
+          urls.get() != null && !urls.get().isEmpty(),
+          "a neo4j connection URL can not be empty or null",
+          urls);
+      return builder().setUrls(urls).build();
+    }
+
+    // Encryption
+    public DriverConfiguration withEncryption() {
+      return builder().setEncryption(ValueProvider.StaticValueProvider.of(Boolean.TRUE)).build();
+    }
+
+    public DriverConfiguration withoutEncryption() {
+      return builder().setEncryption(ValueProvider.StaticValueProvider.of(Boolean.FALSE)).build();
+    }
+
+    // Connection Liveness Check Timout
+    public DriverConfiguration withConnectionLivenessCheckTimeoutMs(
+        long connectionLivenessCheckTimeoutMs) {
+      return withConnectionLivenessCheckTimeoutMs(
+          ValueProvider.StaticValueProvider.of(connectionLivenessCheckTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionLivenessCheckTimeoutMs(
+        ValueProvider<Long> connectionLivenessCheckTimeoutMs) {
+      return builder()
+          .setConnectionLivenessCheckTimeoutMs(connectionLivenessCheckTimeoutMs)
+          .build();
+    }
+
+    // Maximum Connection Lifetime
+    public DriverConfiguration withMaxConnectionLifetimeMs(long maxConnectionLifetimeMs) {
+      return withMaxConnectionLifetimeMs(
+          ValueProvider.StaticValueProvider.of(maxConnectionLifetimeMs));
+    }
+
+    public DriverConfiguration withMaxConnectionLifetimeMs(
+        ValueProvider<Long> maxConnectionLifetimeMs) {
+      return builder().setMaxConnectionLifetimeMs(maxConnectionLifetimeMs).build();
+    }
+
+    // Maximum Connection pool size
+    public DriverConfiguration withMaxConnectionPoolSize(int maxConnectionPoolSize) {
+      return withMaxConnectionPoolSize(ValueProvider.StaticValueProvider.of(maxConnectionPoolSize));
+    }
+
+    public DriverConfiguration withMaxConnectionPoolSize(
+        ValueProvider<Integer> maxConnectionPoolSize) {
+      return builder().setMaxConnectionPoolSize(maxConnectionPoolSize).build();
+    }
+
+    // Connection Acq Timeout
+    public DriverConfiguration withConnectionAcquisitionTimeoutMs(
+        long connectionAcquisitionTimeoutMs) {
+      return withConnectionAcquisitionTimeoutMs(
+          ValueProvider.StaticValueProvider.of(connectionAcquisitionTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionAcquisitionTimeoutMs(
+        ValueProvider<Long> connectionAcquisitionTimeoutMs) {
+      return builder().setConnectionAcquisitionTimeoutMs(connectionAcquisitionTimeoutMs).build();
+    }
+
+    // Connection Timeout
+    public DriverConfiguration withConnectionTimeoutMs(long connectionTimeoutMs) {
+      return withConnectionTimeoutMs(ValueProvider.StaticValueProvider.of(connectionTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionTimeoutMs(ValueProvider<Long> connectionTimeoutMs) {
+      return builder().setConnectionTimeoutMs(connectionTimeoutMs).build();
+    }
+
+    // Maximum Transaction Retry Time
+    public DriverConfiguration withMaxTransactionRetryTimeMs(long maxTransactionRetryTimeMs) {
+      return withMaxTransactionRetryTimeMs(
+          ValueProvider.StaticValueProvider.of(maxTransactionRetryTimeMs));
+    }
+
+    public DriverConfiguration withMaxTransactionRetryTimeMs(
+        ValueProvider<Long> maxTransactionRetryTimeMs) {
+      return builder().setMaxTransactionRetryTimeMs(maxTransactionRetryTimeMs).build();
+    }
+
+    public DriverConfiguration withUsername(String username) {
+      return withUsername(ValueProvider.StaticValueProvider.of(username));
+    }
+
+    public DriverConfiguration withUsername(ValueProvider<String> username) {
+      Preconditions.checkArgument(username != null, "neo4j username can not be null", username);
+      Preconditions.checkArgument(
+          username.get() != null, "neo4j username can not be null", username);
+      return builder().setUsername(username).build();
+    }
+
+    public DriverConfiguration withPassword(String password) {
+      return withPassword(ValueProvider.StaticValueProvider.of(password));
+    }
+
+    public DriverConfiguration withPassword(ValueProvider<String> password) {
+      Preconditions.checkArgument(password != null, "neo4j password can not be null", password);
+      Preconditions.checkArgument(
+          password.get() != null, "neo4j password can not be null", password);
+      return builder().setPassword(password).build();
+    }
+
+    // Encryption
+    public DriverConfiguration withRouting() {
+      return builder().setRouting(ValueProvider.StaticValueProvider.of(Boolean.TRUE)).build();
+    }
+
+    public DriverConfiguration withoutRouting() {
+      return builder().setRouting(ValueProvider.StaticValueProvider.of(Boolean.FALSE)).build();
+    }
+
+    void populateDisplayData(DisplayData.Builder builder) {
+      builder.addIfNotNull(DisplayData.item("neo4j-url", getUrl()));
+      builder.addIfNotNull(DisplayData.item("neo4j-username", getUsername()));
+      builder.addIfNotNull(
+          DisplayData.item(
+              "neo4j-password", getPassword() != null ? "<provided>" : "<not-provided>"));
+      builder.addIfNotNull(
+          DisplayData.item(
+              "neo4j-encryption", getEncryption() != null ? "<provided>" : "<not-provided>"));
+    }
+
+    Driver buildDriver() {
+      // Create the Neo4j Driver
+      // The default is: have the driver make the determination
+      //
+      Config.ConfigBuilder configBuilder = Config.builder();
+
+      if (getEncryption() != null && getEncryption().get() != null) {
+        if (getEncryption().get()) {
+          configBuilder =
+              Config.builder()
+                  .withEncryption()
+                  .withTrustStrategy(Config.TrustStrategy.trustAllCertificates());
+        } else {
+          configBuilder = Config.builder().withoutEncryption();
+        }
+      }
+
+      // physical layer
+      if (getConnectionLivenessCheckTimeoutMs() != null
+          && getConnectionLivenessCheckTimeoutMs().get() != null
+          && getConnectionLivenessCheckTimeoutMs().get() > 0) {
+        configBuilder =
+            configBuilder.withConnectionLivenessCheckTimeout(
+                getConnectionLivenessCheckTimeoutMs().get(), TimeUnit.MILLISECONDS);
+      }
+      if (getMaxConnectionLifetimeMs() != null
+          && getMaxConnectionLifetimeMs().get() != null
+          && getMaxConnectionLifetimeMs().get() > 0) {
+        configBuilder =
+            configBuilder.withMaxConnectionLifetime(
+                getMaxConnectionLifetimeMs().get(), TimeUnit.MILLISECONDS);
+      }
+      if (getMaxConnectionPoolSize() != null && getMaxConnectionPoolSize().get() > 0) {
+        configBuilder = configBuilder.withMaxConnectionPoolSize(getMaxConnectionPoolSize().get());
+      }
+      if (getConnectionAcquisitionTimeoutMs() != null
+          && getConnectionAcquisitionTimeoutMs().get() != null
+          && getConnectionAcquisitionTimeoutMs().get() > 0) {
+        configBuilder =
+            configBuilder.withConnectionAcquisitionTimeout(
+                getConnectionAcquisitionTimeoutMs().get(), TimeUnit.MILLISECONDS);
+      }
+      if (getConnectionTimeoutMs() != null
+          && getConnectionTimeoutMs().get() != null
+          && getConnectionTimeoutMs().get() > 0) {
+        configBuilder =
+            configBuilder.withConnectionTimeout(
+                getConnectionTimeoutMs().get(), TimeUnit.MILLISECONDS);
+      }
+      if (getMaxTransactionRetryTimeMs() != null
+          && getMaxTransactionRetryTimeMs().get() != null
+          && getMaxTransactionRetryTimeMs().get() > 0) {
+        configBuilder =
+            configBuilder.withMaxTransactionRetryTime(
+                getMaxTransactionRetryTimeMs().get(), TimeUnit.MILLISECONDS);
+      }
+
+      // Set sane Logging level
+      //
+      configBuilder = configBuilder.withLogging(Logging.javaUtilLogging(Level.WARNING));
+
+      // Now we have the configuration for the driver
+      //
+      Config config = configBuilder.build();
+
+      // Get the list of the URI to connect with
+      //
+      List<URI> uris = new ArrayList<>();
+      if (getUrl() != null && getUrl().get() != null) {
+        try {
+          uris.add(new URI(getUrl().get()));
+        } catch (URISyntaxException e) {
+          throw new RuntimeException("Error creating URI from URL '" + getUrl().get() + "'", e);
+        }
+      }
+      if (getUrls() != null && getUrls().get() != null) {
+        List<String> urls = getUrls().get();
+        for (String url : urls) {
+          try {
+            uris.add(new URI(url));
+          } catch (URISyntaxException e) {
+            throw new RuntimeException(
+                "Error creating URI '"
+                    + getUrl().get()
+                    + "' from a list of "
+                    + urls.size()
+                    + " URLs",
+                e);
+          }
+        }
+      }
+
+      checkArgument(
+          getUsername() != null && getUsername().get() != null,
+          "please provide a username to connect to Neo4j");
+      checkArgument(
+          getPassword() != null && getPassword().get() != null,
+          "please provide a password to connect to Neo4j");
+
+      Driver driver;
+      if (getRouting() != null && getRouting().get() != null && getRouting().get()) {
+        driver =
+            GraphDatabase.routingDriver(
+                uris, AuthTokens.basic(getUsername().get(), getPassword().get()), config);
+      } else {
+        // Just take the first URI that was provided
+        driver =
+            GraphDatabase.driver(
+                uris.get(0), AuthTokens.basic(getUsername().get(), getPassword().get()), config);
+      }
+
+      // Now we create a
+      return driver;
+    }
+
+    /**
+     * The Builder class below is not visible. We use it to service the "with" methods below the
+     * Builder class.
+     */
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setUrl(ValueProvider<String> url);
+
+      abstract Builder setUrls(ValueProvider<List<String>> url);
+
+      abstract Builder setUsername(ValueProvider<String> username);
+
+      abstract Builder setPassword(ValueProvider<String> password);
+
+      abstract Builder setEncryption(ValueProvider<Boolean> encryption);
+
+      abstract Builder setConnectionLivenessCheckTimeoutMs(
+          ValueProvider<Long> connectionLivenessCheckTimeoutMs);
+
+      abstract Builder setMaxConnectionLifetimeMs(ValueProvider<Long> maxConnectionLifetimeMs);
+
+      abstract Builder setMaxConnectionPoolSize(ValueProvider<Integer> maxConnectionPoolSize);
+
+      abstract Builder setConnectionAcquisitionTimeoutMs(
+          ValueProvider<Long> connectionAcquisitionTimeoutMs);
+
+      abstract Builder setConnectionTimeoutMs(ValueProvider<Long> connectionTimeoutMs);
+
+      abstract Builder setMaxTransactionRetryTimeMs(ValueProvider<Long> maxTransactionRetryTimeMs);
+
+      abstract Builder setRouting(ValueProvider<Boolean> routing);
+
+      abstract DriverConfiguration build();
+    }
+  }
+
+  /** This is the class which handles the work behind the {@link #readAll} method. */
+  @AutoValue
+  public abstract static class ReadAll<ParameterT, OutputT>
+      extends PTransform<PCollection<ParameterT>, PCollection<OutputT>> {
+
+    abstract @Nullable SerializableFunction<Void, Driver> getDriverProviderFn();
+
+    abstract @Nullable ValueProvider<String> getDatabase();
+
+    abstract @Nullable ValueProvider<String> getCypher();
+
+    abstract @Nullable ValueProvider<Boolean> getWriteTransaction();
+
+    abstract @Nullable ValueProvider<Long> getTransactionTimeoutMs();
+
+    abstract @Nullable RowMapper<OutputT> getRowMapper();
+
+    abstract @Nullable SerializableFunction<ParameterT, Map<String, Object>>
+        getParametersFunction();
+
+    abstract @Nullable Coder<OutputT> getCoder();
+
+    abstract @Nullable ValueProvider<Long> getFetchSize();
+
+    abstract @Nullable ValueProvider<Boolean> getLogCypher();
+
+    abstract Builder<ParameterT, OutputT> toBuilder();
+
+    // Driver configuration
+    public ReadAll<ParameterT, OutputT> withDriverConfiguration(DriverConfiguration config) {
+      return toBuilder()
+          .setDriverProviderFn(new DriverProviderFromDriverConfiguration(config))
+          .build();
+    }
+
+    // Cypher
+    public ReadAll<ParameterT, OutputT> withCypher(String cypher) {
+      checkArgument(
+          cypher != null, "Neo4jIO.readAll().withCypher(query) called with null cypher query");
+      return withCypher(ValueProvider.StaticValueProvider.of(cypher));
+    }
+
+    public ReadAll<ParameterT, OutputT> withCypher(ValueProvider<String> cypher) {
+      checkArgument(cypher != null, "Neo4jIO.readAll().withCypher(cypher) called with null cypher");
+      return toBuilder().setCypher(cypher).build();
+    }
+
+    // Transaction timeout
+    public ReadAll<ParameterT, OutputT> withTransactionTimeoutMs(long timeout) {
+      checkArgument(
+          timeout > 0,
+          "Neo4jIO.readAll().withTransactionTimeOutMs(timeout) called with timeout<=0");
+      return withTransactionTimeoutMs(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public ReadAll<ParameterT, OutputT> withTransactionTimeoutMs(ValueProvider<Long> timeout) {
+      checkArgument(
+          timeout != null && timeout.get() > 0,
+          "Neo4jIO.readAll().withTransactionTimeOutMs(timeout) called with timeout<=0");
+      return toBuilder().setTransactionTimeoutMs(timeout).build();
+    }
+
+    // Database
+    public ReadAll<ParameterT, OutputT> withDatabase(String database) {
+      checkArgument(
+          database != null, "Neo4jIO.readAll().withDatabase(database) called with null database");
+      return withDatabase(ValueProvider.StaticValueProvider.of(database));
+    }
+
+    public ReadAll<ParameterT, OutputT> withDatabase(ValueProvider<String> database) {
+      checkArgument(
+          database != null, "Neo4jIO.readAll().withDatabase(database) called with null database");
+      return toBuilder().setDatabase(database).build();
+    }
+
+    // Fetch size
+    public ReadAll<ParameterT, OutputT> withFetchSize(long fetchSize) {
+      checkArgument(
+          fetchSize > 0, "Neo4jIO.readAll().withFetchSize(query) called with fetchSize<=0");
+      return withFetchSize(ValueProvider.StaticValueProvider.of(fetchSize));
+    }
+
+    public ReadAll<ParameterT, OutputT> withFetchSize(ValueProvider<Long> fetchSize) {
+      checkArgument(
+          fetchSize != null && fetchSize.get() >= 0,
+          "Neo4jIO.readAll().withFetchSize(query) called with fetchSize<=0");
+      return toBuilder().setFetchSize(fetchSize).build();
+    }
+
+    // Row mapper
+    public ReadAll<ParameterT, OutputT> withRowMapper(RowMapper<OutputT> rowMapper) {
+      checkArgument(
+          rowMapper != null,
+          "Neo4jIO.readAll().withRowMapper(rowMapper) called with null rowMapper");
+      return toBuilder().setRowMapper(rowMapper).build();
+    }
+
+    // Parameters mapper
+    public ReadAll<ParameterT, OutputT> withParametersFunction(
+        SerializableFunction<ParameterT, Map<String, Object>> parametersFunction) {
+      checkArgument(
+          parametersFunction != null,
+          "Neo4jIO.readAll().withParametersFunction(parametersFunction) called with null parametersFunction");
+      return toBuilder().setParametersFunction(parametersFunction).build();
+    }
+
+    // Coder
+    public ReadAll<ParameterT, OutputT> withCoder(Coder<OutputT> coder) {
+      checkArgument(coder != null, "Neo4jIO.readAll().withCoder(coder) called with null coder");
+      return toBuilder().setCoder(coder).build();
+    }
+
+    // Read/Write transaction
+    public ReadAll<ParameterT, OutputT> withReadTransaction() {
+      return toBuilder()
+          .setWriteTransaction(ValueProvider.StaticValueProvider.of(Boolean.FALSE))
+          .build();
+    }
+
+    public ReadAll<ParameterT, OutputT> withWriteTransaction() {
+      return toBuilder()
+          .setWriteTransaction(ValueProvider.StaticValueProvider.of(Boolean.TRUE))
+          .build();
+    }
+
+    // Log cypher statements
+    public ReadAll<ParameterT, OutputT> withoutCypherLogging() {
+      return toBuilder().setLogCypher(ValueProvider.StaticValueProvider.of(Boolean.FALSE)).build();
+    }
+
+    public ReadAll<ParameterT, OutputT> withCypherLogging() {
+      return toBuilder().setLogCypher(ValueProvider.StaticValueProvider.of(Boolean.TRUE)).build();
+    }
+
+    @Override
+    public PCollection<OutputT> expand(PCollection<ParameterT> input) {
+
+      checkArgument(
+          getCypher() != null && getCypher().get() != null,
+          "please provide a cypher statement to execute");
+
+      long fetchSize;
+      if (getFetchSize() == null || getFetchSize().get() <= 0) {
+        fetchSize = 0;
+      } else {
+        fetchSize = getFetchSize().get();
+      }
+
+      String databaseName = null;
+      if (getDatabase() != null) {
+        databaseName = getDatabase().get();
+      }
+
+      boolean writeTransaction = false;
+      if (getWriteTransaction() != null) {
+        writeTransaction = getWriteTransaction().get();
+      }
+
+      long transactionTimeOutMs;
+      if (getTransactionTimeoutMs() == null || getTransactionTimeoutMs().get() < 0) {
+        transactionTimeOutMs = -1;
+      } else {
+        transactionTimeOutMs = getTransactionTimeoutMs().get();
+      }
+
+      boolean logCypher = false;
+      if (getLogCypher() != null) {
+        logCypher = getLogCypher().get();
+      }
+
+      ReadFn<ParameterT, OutputT> readFn =
+          new ReadFn<>(
+              getDriverProviderFn(),
+              getCypher().get(),
+              getRowMapper(),
+              getParametersFunction(),
+              fetchSize,
+              databaseName,
+              writeTransaction,
+              transactionTimeOutMs,
+              logCypher);
+
+      return getOutputPCollection(input, readFn, getCoder());
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("cypher", getCypher()));
+      builder.add(DisplayData.item("rowMapper", getRowMapper().getClass().getName()));
+      builder.add(DisplayData.item("coder", getCoder().getClass().getName()));
+      if (getDriverProviderFn() instanceof HasDisplayData) {
+        ((HasDisplayData) getDriverProviderFn()).populateDisplayData(builder);
+      }
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder<ParameterT, OutputT> {
+      abstract Builder<ParameterT, OutputT> setDriverProviderFn(
+          SerializableFunction<Void, Driver> driverProviderFn);
+
+      abstract Builder<ParameterT, OutputT> setCypher(ValueProvider<String> cypher);
+
+      abstract Builder<ParameterT, OutputT> setDatabase(ValueProvider<String> cypher);
+
+      abstract Builder<ParameterT, OutputT> setWriteTransaction(
+          ValueProvider<Boolean> writeTransaction);
+
+      abstract Builder<ParameterT, OutputT> setTransactionTimeoutMs(
+          ValueProvider<Long> transactionTimeoutMs);
+
+      abstract Builder<ParameterT, OutputT> setRowMapper(RowMapper<OutputT> rowMapper);
+
+      abstract Builder<ParameterT, OutputT> setParametersFunction(
+          SerializableFunction<ParameterT, Map<String, Object>> parametersFunction);
+
+      abstract Builder<ParameterT, OutputT> setCoder(Coder<OutputT> coder);
+
+      abstract Builder<ParameterT, OutputT> setFetchSize(ValueProvider<Long> fetchSize);
+
+      abstract Builder<ParameterT, OutputT> setLogCypher(ValueProvider<Boolean> logCypher);
+
+      abstract ReadAll<ParameterT, OutputT> build();
+    }
+  }
+
+  /** A {@link DoFn} to execute a Cypher query to read from Neo4j. */
+  private static class ReadWriteFn<ParameterT, OutputT> extends DoFn<ParameterT, OutputT> {
+    protected final SerializableFunction<Void, Driver> driverProviderFn;
+    protected final String databaseName;
+    protected final long fetchSize;
+    protected final long transactionTimeoutMs;
+
+    protected transient Driver driver;
+    protected transient Session session;
+    protected transient TransactionConfig transactionConfig;
+
+    private ReadWriteFn(
+        SerializableFunction<Void, Driver> driverProviderFn,
+        String databaseName,
+        long fetchSize,
+        long transactionTimeoutMs) {
+      this.driverProviderFn = driverProviderFn;
+      this.databaseName = databaseName;
+      this.fetchSize = fetchSize;
+      this.transactionTimeoutMs = transactionTimeoutMs;
+    }
+
+    /**
+     * Delay the creation of driver and session until we actually have data to do something with.
+     */
+    @Setup
+    public void setup() {}
+
+    protected void buildDriverAndSession() {
+      if (driver == null && session == null && transactionConfig == null) {
+        driver = driverProviderFn.apply(null);
+
+        SessionConfig.Builder builder = SessionConfig.builder();
+        if (databaseName != null) {
+          builder = builder.withDatabase(databaseName);
+        }
+        builder = builder.withFetchSize(fetchSize);
+        session = driver.session(builder.build());
+
+        TransactionConfig.Builder configBuilder = TransactionConfig.builder();
+        if (transactionTimeoutMs > 0) {
+          configBuilder = configBuilder.withTimeout(Duration.ofMillis(transactionTimeoutMs));
+        }
+        transactionConfig = configBuilder.build();
+      }
+    }
+
+    @FinishBundle
+    public void finishBundle() {
+      cleanUpDriverSession();
+    }
+
+    @Override
+    protected void finalize() {
+      cleanUpDriverSession();
+    }
+
+    protected void cleanUpDriverSession() {
+      if (session != null) {
+        try {
+          session.close();
+        } finally {
+          session = null;
+        }
+      }
+      if (driver != null) {
+        try {
+          driver.close();
+        } finally {
+          driver = null;
+        }
+      }
+    }
+
+    protected String getParametersString(Map<String, Object> parametersMap) {
+      StringBuilder parametersString = new StringBuilder();
+      parametersMap
+          .keySet()
+          .forEach(
+              key -> {
+                if (parametersString.length() > 0) {
+                  parametersString.append(',');
+                }
+                parametersString.append(key).append('=');
+                Object value = parametersMap.get(key);
+                if (value == null) {
+                  parametersString.append("<null>");
+                } else {
+                  parametersString.append(value);
+                }
+              });
+      return parametersString.toString();
+    }
+  }
+
+  /** A {@link DoFn} to execute a Cypher query to read from Neo4j. */
+  private static class ReadFn<ParameterT, OutputT> extends ReadWriteFn<ParameterT, OutputT> {
+    private final String cypher;
+    private final RowMapper<OutputT> rowMapper;
+    private final SerializableFunction<ParameterT, Map<String, Object>> parametersFunction;
+    private final boolean writeTransaction;
+    private final boolean logCypher;
+
+    private ReadFn(
+        SerializableFunction<Void, Driver> driverProviderFn,
+        String cypher,
+        RowMapper<OutputT> rowMapper,
+        SerializableFunction<ParameterT, Map<String, Object>> parametersFunction,
+        long fetchSize,
+        String databaseName,
+        boolean writeTransaction,
+        long transactionTimeoutMs,
+        boolean logCypher) {
+      super(driverProviderFn, databaseName, fetchSize, transactionTimeoutMs);
+      this.cypher = cypher;
+      this.rowMapper = rowMapper;
+      this.parametersFunction = parametersFunction;
+      this.writeTransaction = writeTransaction;
+      this.logCypher = logCypher;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      // Some initialization needs to take place as late as possible.
+      // If this method is not called (no input) then we don't need to create a session
+      //
+      if (session == null) {
+        buildDriverAndSession();
+      }
+
+      // Map the input data to the parameters map...
+      //
+      ParameterT parameters = context.element();
+      final Map<String, Object> parametersMap;
+      if (parametersFunction != null) {
+        parametersMap = parametersFunction.apply(parameters);
+      } else {
+        parametersMap = Collections.emptyMap();
+      }
+      executeReadCypherStatement(context, parametersMap);
+    }
+
+    private void executeReadCypherStatement(
+        final ProcessContext processContext, Map<String, Object> parametersMap) {
+      // The transaction.
+      //
+      TransactionWork<Void> transactionWork =
+          transaction -> {
+            Result result = transaction.run(cypher, parametersMap);
+            while (result.hasNext()) {
+              Record record = result.next();
+              try {
+                OutputT outputT = rowMapper.mapRow(record);
+                processContext.output(outputT);
+              } catch (Exception e) {
+                throw new RuntimeException("error mapping Neo4j record to row", e);
+              }
+            }
+
+            // No specific Neo4j transaction output beyond what goes to the output
+            return null;
+          };
+
+      if (logCypher) {
+        String parametersString = getParametersString(parametersMap);
+
+        String readWrite = writeTransaction ? "write" : "read";
+        LOG.info(
+            "Starting a "
+                + readWrite
+                + " transaction for cypher: "
+                + cypher
+                + ", parameters: "
+                + parametersString);
+      }
+
+      // There are 2 ways to do a transaction on Neo4j: read or write
+      //
+      if (writeTransaction) {
+        session.writeTransaction(transactionWork, transactionConfig);
+      } else {
+        session.readTransaction(transactionWork, transactionConfig);
+      }
+    }
+  }
+
+  /**
+   * Wraps a {@link DriverConfiguration} to provide a {@link Driver}.
+   *
+   * <p>At most a single {@link Driver} instance will be constructed during pipeline execution for
+   * each unique {@link DriverConfiguration} within the pipeline.
+   */
+  public static class DriverProviderFromDriverConfiguration
+      implements SerializableFunction<Void, Driver>, HasDisplayData {
+    private static final ConcurrentHashMap<DriverConfiguration, Driver> instances =
+        new ConcurrentHashMap<>();
+    private final DriverConfiguration config;
+
+    private DriverProviderFromDriverConfiguration(DriverConfiguration config) {
+      this.config = config;
+    }
+
+    public static SerializableFunction<Void, Driver> of(DriverConfiguration config) {
+      return new DriverProviderFromDriverConfiguration(config);
+    }
+
+    @Override
+    public Driver apply(Void input) {
+      return config.buildDriver();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      config.populateDisplayData(builder);
+    }
+  }
+
+  /** This is the class which handles the work behind the {@link #writeUnwind()} method. */
+  @AutoValue
+  public abstract static class WriteUnwind<ParameterT>
+      extends PTransform<PCollection<ParameterT>, PDone> {
+
+    abstract @Nullable SerializableFunction<Void, Driver> getDriverProviderFn();
+
+    abstract @Nullable ValueProvider<String> getDatabase();
+
+    abstract @Nullable ValueProvider<String> getCypher();
+
+    abstract @Nullable ValueProvider<String> getUnwindMapName();
+
+    abstract @Nullable ValueProvider<Long> getTransactionTimeoutMs();
+
+    abstract @Nullable SerializableFunction<ParameterT, Map<String, Object>>
+        getParametersFunction();
+
+    abstract @Nullable ValueProvider<Long> getBatchSize();
+
+    abstract @Nullable ValueProvider<Boolean> getLogCypher();
+
+    abstract Builder<ParameterT> toBuilder();
+
+    // Driver configuration
+    public WriteUnwind<ParameterT> withDriverConfiguration(DriverConfiguration config) {
+      return toBuilder()
+          .setDriverProviderFn(new DriverProviderFromDriverConfiguration(config))
+          .build();
+    }
+
+    // Cypher
+    public WriteUnwind<ParameterT> withCypher(String cypher) {
+      checkArgument(
+          cypher != null, "Neo4jIO.writeUnwind().withCypher(query) called with null cypher query");
+      return withCypher(ValueProvider.StaticValueProvider.of(cypher));
+    }
+
+    public WriteUnwind<ParameterT> withCypher(ValueProvider<String> cypher) {
+      checkArgument(
+          cypher != null, "Neo4jIO.writeUnwind().withCypher(cypher) called with null cypher");
+      return toBuilder().setCypher(cypher).build();
+    }
+
+    // UnwindMapName
+    public WriteUnwind<ParameterT> withUnwindMapName(String mapName) {
+      checkArgument(
+          mapName != null,
+          "Neo4jIO.writeUnwind().withUnwindMapName(query) called with null mapName");
+      return withUnwindMapName(ValueProvider.StaticValueProvider.of(mapName));
+    }
+
+    public WriteUnwind<ParameterT> withUnwindMapName(ValueProvider<String> mapName) {
+      checkArgument(
+          mapName != null,
+          "Neo4jIO.writeUnwind().withUnwindMapName(cypher) called with null mapName");
+      return toBuilder().setUnwindMapName(mapName).build();
+    }
+
+    // Transaction timeout
+    public WriteUnwind<ParameterT> withTransactionTimeoutMs(long timeout) {
+      checkArgument(
+          timeout > 0,
+          "Neo4jIO.writeUnwind().withTransactionTimeOutMs(timeout) called with timeout<=0");
+      return withTransactionTimeoutMs(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public WriteUnwind<ParameterT> withTransactionTimeoutMs(ValueProvider<Long> timeout) {
+      checkArgument(
+          timeout != null && timeout.get() > 0,
+          "Neo4jIO.writeUnwind().withTransactionTimeOutMs(timeout) called with timeout<=0");
+      return toBuilder().setTransactionTimeoutMs(timeout).build();
+    }
+
+    // Database
+    public WriteUnwind<ParameterT> withDatabase(String database) {
+      checkArgument(
+          database != null,
+          "Neo4jIO.writeUnwind().withDatabase(database) called with null database");
+      return withDatabase(ValueProvider.StaticValueProvider.of(database));
+    }
+
+    public WriteUnwind<ParameterT> withDatabase(ValueProvider<String> database) {
+      checkArgument(
+          database != null,
+          "Neo4jIO.writeUnwind().withDatabase(database) called with null database");
+      return toBuilder().setDatabase(database).build();
+    }
+
+    // Batch size
+    public WriteUnwind<ParameterT> withBatchSize(long batchSize) {
+      checkArgument(
+          batchSize > 0, "Neo4jIO.writeUnwind().withFetchSize(query) called with batchSize<=0");
+      return withBatchSize(ValueProvider.StaticValueProvider.of(batchSize));
+    }
+
+    public WriteUnwind<ParameterT> withBatchSize(ValueProvider<Long> batchSize) {
+      checkArgument(
+          batchSize != null && batchSize.get() >= 0,
+          "Neo4jIO.readAll().withBatchSize(query) called with batchSize<=0");
+      return toBuilder().setBatchSize(batchSize).build();
+    }
+
+    // Parameters mapper
+    public WriteUnwind<ParameterT> withParametersFunction(
+        SerializableFunction<ParameterT, Map<String, Object>> parametersFunction) {
+      checkArgument(
+          parametersFunction != null,
+          "Neo4jIO.readAll().withParametersFunction(parametersFunction) called with null parametersFunction");
+      return toBuilder().setParametersFunction(parametersFunction).build();
+    }
+
+    // Logging
+    public WriteUnwind<ParameterT> withCypherLogging() {
+      return toBuilder().setLogCypher(ValueProvider.StaticValueProvider.of(Boolean.TRUE)).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<ParameterT> input) {
+
+      checkArgument(
+          getCypher() != null && getCypher().get() != null,
+          "please provide an unwind cypher statement to execute");
+      checkArgument(
+          getUnwindMapName() != null && getUnwindMapName().get() != null,
+          "please provide an unwind map name");
+
+      String databaseName = null;
+      if (getDatabase() != null) {
+        databaseName = getDatabase().get();
+      }
+
+      String unwindMapName = null;
+      if (getUnwindMapName() != null) {
+        unwindMapName = getUnwindMapName().get();
+      }
+
+      long transactionTimeOutMs;
+      if (getTransactionTimeoutMs() == null || getTransactionTimeoutMs().get() < 0) {
+        transactionTimeOutMs = -1;
+      } else {
+        transactionTimeOutMs = getTransactionTimeoutMs().get();
+      }
+
+      long batchSize;
+      if (getBatchSize() == null || getBatchSize().get() <= 0) {
+        batchSize = 1;
+      } else {
+        batchSize = getBatchSize().get();
+      }
+
+      boolean logCypher = false;
+      if (getLogCypher() != null) {
+        logCypher = getLogCypher().get();
+      }
+
+      WriteUnwindFn<ParameterT> writeFn =
+          new WriteUnwindFn<>(
+              getDriverProviderFn(),
+              getCypher().get(),
+              getParametersFunction(),
+              -1,
+              databaseName,
+              transactionTimeOutMs,
+              batchSize,
+              logCypher,
+              unwindMapName);
+
+      input.apply(ParDo.of(writeFn));
+
+      return PDone.in(input.getPipeline());
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("cypher", getCypher()));
+      if (getDriverProviderFn() instanceof HasDisplayData) {
+        ((HasDisplayData) getDriverProviderFn()).populateDisplayData(builder);
+      }
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder<ParameterT> {
+      abstract Builder<ParameterT> setDriverProviderFn(
+          SerializableFunction<Void, Driver> driverProviderFn);
+
+      abstract Builder<ParameterT> setCypher(ValueProvider<String> cypher);
+
+      abstract Builder<ParameterT> setUnwindMapName(ValueProvider<String> unwindMapName);
+
+      abstract Builder<ParameterT> setDatabase(ValueProvider<String> database);
+
+      abstract Builder<ParameterT> setTransactionTimeoutMs(
+          ValueProvider<Long> transactionTimeoutMs);
+
+      abstract Builder<ParameterT> setParametersFunction(
+          SerializableFunction<ParameterT, Map<String, Object>> parametersFunction);
+
+      abstract Builder<ParameterT> setBatchSize(ValueProvider<Long> batchSize);
+
+      abstract Builder<ParameterT> setLogCypher(ValueProvider<Boolean> logCypher);
+
+      abstract WriteUnwind<ParameterT> build();
+    }
+  }
+
+  /** A {@link DoFn} to execute a Cypher query to read from Neo4j. */
+  private static class WriteUnwindFn<ParameterT> extends ReadWriteFn<ParameterT, Void> {
+
+    private final String cypher;
+    private final SerializableFunction<ParameterT, Map<String, Object>> parametersFunction;
+    private final boolean logCypher;
+    private final long batchSize;
+    private final String unwindMapName;
+
+    private long elementsInput;
+    private boolean loggingDone;
+    private List<Map<String, Object>> unwindList;
+
+    private WriteUnwindFn(
+        SerializableFunction<Void, Driver> driverProviderFn,
+        String cypher,
+        SerializableFunction<ParameterT, Map<String, Object>> parametersFunction,
+        long fetchSize,
+        String databaseName,
+        long transactionTimeoutMs,
+        long batchSize,
+        boolean logCypher,
+        String unwindMapName) {
+      super(driverProviderFn, databaseName, fetchSize, transactionTimeoutMs);
+      this.cypher = cypher;
+      this.parametersFunction = parametersFunction;
+      this.logCypher = logCypher;
+      this.batchSize = batchSize;
+      this.unwindMapName = unwindMapName;
+
+      unwindList = new ArrayList<>();
+
+      elementsInput = 0;
+      loggingDone = false;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      if (session == null) {
+        buildDriverAndSession();
+      }
+
+      // Map the input data to the parameters map...
+      //
+      ParameterT parameters = context.element();
+      if (parametersFunction != null) {
+        // Every input element creates a new Map<String,Object> entry in unwindList
+        //
+        unwindList.add(parametersFunction.apply(parameters));
+      } else {
+        // Someone is writing a bunch of static or procedurally generated values to Neo4j
+        unwindList.add(Collections.emptyMap());
+      }
+      elementsInput++;

Review comment:
       If the expectation is that always `unwindList.size() == elementsInput`, why don't we remove `elementsInput` and just use `unwindList.size()`?

##########
File path: sdks/java/io/neo4j/src/test/java/org/apache/beam/sdk/io/neo4j/Neo4jIOIT.java
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.neo4j;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.testcontainers.containers.Neo4jContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
+
+@RunWith(JUnit4.class)
+public class Neo4jIOIT {
+
+  private static Network network;
+  private static Neo4jContainer<?> neo4jContainer;
+
+  @Rule public transient TestPipeline parameterizedReadPipeline = TestPipeline.create();

Review comment:
       Nit: it is acceptable (and common practice) to use a single `TestPipeline` for multiple tests in the same class.

##########
File path: sdks/java/io/neo4j/src/test/java/org/apache/beam/sdk/io/neo4j/Neo4jIOIT.java
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.neo4j;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.testcontainers.containers.Neo4jContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
+
+@RunWith(JUnit4.class)
+public class Neo4jIOIT {
+
+  private static Network network;
+  private static Neo4jContainer<?> neo4jContainer;
+
+  @Rule public transient TestPipeline parameterizedReadPipeline = TestPipeline.create();
+  @Rule public transient TestPipeline writeUnwindPipeline = TestPipeline.create();
+  @Rule public transient TestPipeline largeWriteUnwindPipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    // network sharing doesn't work with ClassRule
+    network = Network.newNetwork();
+
+    neo4jContainer =
+        new Neo4jContainer<>(DockerImageName.parse("neo4j").withTag(Neo4jTestUtil.NEO4J_VERSION))
+            .withStartupAttempts(1)
+            .withExposedPorts(7687, 7474)
+            .withNetwork(network)
+            .withEnv(
+                "NEO4J_AUTH", Neo4jTestUtil.NEO4J_USERNAME + "/" + Neo4jTestUtil.NEO4J_PASSWORD)
+            .withEnv("NEO4J_dbms_default_listen_address", "0.0.0.0")
+            .withNetworkAliases(Neo4jTestUtil.NEO4J_HOSTNAME)
+            .withSharedMemorySize(256 * 1024 * 1024L); // 256MB
+
+    // Start Neo4j
+    neo4jContainer.start();
+
+    // Start with an empty database to use for testing.
+    // This prevents any possibility of some old data messing up the test results.
+    // We add a unique constraint to see we're not trying to create nodes twice in the larger test
+    // below
+    //
+    Neo4jTestUtil.executeOnNeo4j(
+        "CREATE OR REPLACE DATABASE " + Neo4jTestUtil.NEO4J_DATABASE, false);
+    Neo4jTestUtil.executeOnNeo4j(
+        "CREATE CONSTRAINT something_id_unique ON (n:Something) ASSERT n.id IS UNIQUE", true);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    neo4jContainer.stop();
+    neo4jContainer.close();
+
+    try {
+      network.close();
+    } catch (Exception e) {
+      // ignore

Review comment:
       Let's log a warning with the exception in case there is an issue.

##########
File path: sdks/java/io/neo4j/src/main/java/org/apache/beam/sdk/io/neo4j/Neo4jIO.java
##########
@@ -0,0 +1,1346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.neo4j;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.harness.JvmInitializer;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Logging;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.TransactionConfig;
+import org.neo4j.driver.TransactionWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a Beam IO to read from, and write data to, Neo4j.
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <h3>Driver configuration</h3>
+ *
+ * <p>To read from or write to Neo4j you have to provide a {@link DriverConfiguration} using<br>
+ * 1. {@link DriverConfiguration#create()} (which must be {@link Serializable});<br>
+ * 2. or {@link DriverConfiguration#create(String, String, String)} (URL, username and password).
+ *
+ * <p>If you have trouble connecting to a Neo4j Aura database please try to disable a few security
+ * algorithms in your JVM. This makes sure that the right one is picked to connect:
+ *
+ * <p>
+ *
+ * <pre>{@code
+ * Security.setProperty(
+ *       "jdk.tls.disabledAlgorithms",
+ *       "SSLv3, RC4, DES, MD5withRSA, DH keySize < 1024, EC keySize < 224, 3DES_EDE_CBC, anon, NULL");
+ * }</pre>
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <p>To execute this code on GCP Dataflow you can create a class which extends {@link
+ * JvmInitializer} and implement the {@link JvmInitializer#onStartup()} method. You need to annotate
+ * this new class with {@link com.google.auto.service.AutoService}
+ *
+ * <pre>{@code
+ * @AutoService(value = JvmInitializer.class)
+ * }</pre>
+ *
+ * <p>
+ *
+ * <h3>Reading from Neo4j</h3>
+ *
+ * <p>{@link Neo4jIO#readAll()} source returns a bounded collection of {@code OuptutT} as a {@code
+ * PCollection<OutputT>}. OutputT is the type returned by the provided {@link RowMapper}. It accepts
+ * parameters as input in the form of {@code ParameterT} as a {@code PCollection<ParameterT>}
+ *
+ * <p>The following example reads ages to return the IDs of Person nodes. It runs a Cypher query for
+ * each provided age.
+ *
+ * <p>The mapping {@link SerializableFunction} maps input values to each execution of the Cypher
+ * statement. In the function simply return a map containing the parameters you want to set.
+ *
+ * <p>The {@link RowMapper} converts output Neo4j {@link Record} values to the output of the source.
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(Create.of(40, 50, 60))
+ *   .apply(Neo4jIO.<Integer, String>readAll()
+ *     .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *     .withCypher("MATCH(n:Person) WHERE n.age = $age RETURN n.id")
+ *     .withReadTransaction()
+ *     .withCoder(StringUtf8Coder.of())
+ *     .withParametersFunction( age -> Collections.singletonMap( "age", age ))
+ *     .withRowMapper( record -> return record.get(0).asString() )
+ *   );
+ * }</pre>
+ *
+ * <h3>Writing to Neo4j</h3>
+ *
+ * <p>Neo4j sink supports writing data to a graph. It writes a {@link PCollection} to the graph by
+ * collecting a batch of rows after which all rows in this batch are written together to Neo4j. This
+ * is done using the {@link WriteUnwind} sink transform.
+ *
+ * <p>Like the source, to configure the sink, you have to provide a {@link DriverConfiguration}.
+ *
+ * <p>In the following example we'll merge a collection of {@link org.apache.beam.sdk.values.Row}
+ * into Person nodes. Since this is a Sink it has no output and as such no RowMapper is needed. The
+ * rows are being used as a container for the parameters of the Cypher statement. The used Cypher in
+ * question needs to be an UNWIND statement. Like in the read case, the parameters {@link
+ * SerializableFunction} converts parameter values to a {@link Map<String, Object>}. The difference
+ * here is that the resulting Map is stored in a {@link List} (containing maps) which in turn is
+ * stored in another Map under the name provided by the {@link
+ * WriteUnwind#withUnwindMapName(String)} method. All of this is handled automatically. You do need
+ * to provide the unwind map name so that you can reference that in the UNWIND statement.
+ *
+ * <p>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(...)
+ *   .apply(Neo4jIO.<Row>writeUnwind()
+ *      .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *      .withUnwindMapName("rows")
+ *      .withCypher("UNWIND $rows AS row MERGE(n:Person { id : row.id } ) SET n.firstName = row.first, n.lastName = row.last")
+ *      .withParametersFunction( row -> ImmutableMap.of(
+ *        "id", row.getString("id),
+ *        "first", row.getString("firstName")
+ *        "last", row.getString("lastName")))
+ *    );
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class Neo4jIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Neo4jIO.class);
+
+  /**
+   * Read all rows using a Neo4j Cypher query.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   * @param <OutputT> Type of the data to be read.
+   */
+  public static <ParameterT, OutputT> ReadAll<ParameterT, OutputT> readAll() {
+    return new AutoValue_Neo4jIO_ReadAll.Builder<ParameterT, OutputT>()
+        .setFetchSize(ValueProvider.StaticValueProvider.of(Config.defaultConfig().fetchSize()))
+        .build();
+  }
+
+  /**
+   * Write all rows using a Neo4j Cypher UNWIND cypher statement. This sets a default batch batch
+   * size of 5000.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   */
+  public static <ParameterT> WriteUnwind<ParameterT> writeUnwind() {
+    return new AutoValue_Neo4jIO_WriteUnwind.Builder<ParameterT>()
+        .setBatchSize(ValueProvider.StaticValueProvider.of(5000L))
+        .build();
+  }
+
+  private static <ParameterT, OutputT> PCollection<OutputT> getOutputPCollection(
+      PCollection<ParameterT> input, DoFn<ParameterT, OutputT> writeFn, Coder<OutputT> coder) {
+    PCollection<OutputT> output = input.apply(ParDo.of(writeFn)).setCoder(coder);
+
+    try {
+      TypeDescriptor<OutputT> typeDesc = coder.getEncodedTypeDescriptor();
+      SchemaRegistry registry = input.getPipeline().getSchemaRegistry();
+      Schema schema = registry.getSchema(typeDesc);
+      output.setSchema(
+          schema,
+          typeDesc,
+          registry.getToRowFunction(typeDesc),
+          registry.getFromRowFunction(typeDesc));
+    } catch (NoSuchSchemaException e) {
+      // ignore
+    }
+    return output;
+  }
+
+  /**
+   * An interface used by {@link ReadAll} for converting each row of a Neo4j {@link Result} record
+   * {@link Record} into an element of the resulting {@link PCollection}.
+   */
+  @FunctionalInterface
+  public interface RowMapper<T> extends Serializable {
+    T mapRow(Record record) throws Exception;
+  }
+
+  /** This describes all the information needed to create a Neo4j {@link Session}. */
+  @AutoValue
+  public abstract static class DriverConfiguration implements Serializable {
+    public static DriverConfiguration create() {
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder().build();
+    }
+
+    public static DriverConfiguration create(String url, String username, String password) {
+      checkArgument(url != null, "url can not be null");
+      checkArgument(username != null, "username can not be null");
+      checkArgument(password != null, "password can not be null");
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder()
+          .build()
+          .withUrl(url)
+          .withUsername(username)
+          .withPassword(password);
+    }
+
+    abstract @Nullable ValueProvider<String> getUrl();
+
+    abstract @Nullable ValueProvider<List<String>> getUrls();
+
+    abstract @Nullable ValueProvider<String> getUsername();
+
+    abstract @Nullable ValueProvider<String> getPassword();
+
+    abstract @Nullable ValueProvider<Boolean> getEncryption();
+
+    abstract @Nullable ValueProvider<Long> getConnectionLivenessCheckTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getMaxConnectionLifetimeMs();
+
+    abstract @Nullable ValueProvider<Integer> getMaxConnectionPoolSize();
+
+    abstract @Nullable ValueProvider<Long> getConnectionAcquisitionTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getConnectionTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getMaxTransactionRetryTimeMs();
+
+    abstract @Nullable ValueProvider<Boolean> getRouting();
+
+    abstract Builder builder();
+
+    // URL
+    public DriverConfiguration withUrl(String url) {
+      return withUrl(ValueProvider.StaticValueProvider.of(url));
+    }
+
+    public DriverConfiguration withUrl(ValueProvider<String> url) {
+      Preconditions.checkArgument(
+          url != null, "a neo4j connection URL can not be empty or null", url);
+      Preconditions.checkArgument(
+          StringUtils.isNotEmpty(url.get()),
+          "a neo4j connection URL can not be empty or null",
+          url);
+      return builder().setUrl(url).build();
+    }
+
+    // URLS
+    public DriverConfiguration withUrls(List<String> urls) {
+      return withUrls(ValueProvider.StaticValueProvider.of(urls));
+    }
+
+    public DriverConfiguration withUrls(ValueProvider<List<String>> urls) {
+      Preconditions.checkArgument(
+          urls != null, "a list of neo4j connection URLs can not be empty or null", urls);
+      Preconditions.checkArgument(
+          urls.get() != null && !urls.get().isEmpty(),
+          "a neo4j connection URL can not be empty or null",
+          urls);
+      return builder().setUrls(urls).build();
+    }
+
+    // Encryption
+    public DriverConfiguration withEncryption() {
+      return builder().setEncryption(ValueProvider.StaticValueProvider.of(Boolean.TRUE)).build();
+    }
+
+    public DriverConfiguration withoutEncryption() {
+      return builder().setEncryption(ValueProvider.StaticValueProvider.of(Boolean.FALSE)).build();
+    }
+
+    // Connection Liveness Check Timout
+    public DriverConfiguration withConnectionLivenessCheckTimeoutMs(
+        long connectionLivenessCheckTimeoutMs) {
+      return withConnectionLivenessCheckTimeoutMs(
+          ValueProvider.StaticValueProvider.of(connectionLivenessCheckTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionLivenessCheckTimeoutMs(
+        ValueProvider<Long> connectionLivenessCheckTimeoutMs) {
+      return builder()
+          .setConnectionLivenessCheckTimeoutMs(connectionLivenessCheckTimeoutMs)
+          .build();
+    }
+
+    // Maximum Connection Lifetime
+    public DriverConfiguration withMaxConnectionLifetimeMs(long maxConnectionLifetimeMs) {
+      return withMaxConnectionLifetimeMs(
+          ValueProvider.StaticValueProvider.of(maxConnectionLifetimeMs));
+    }
+
+    public DriverConfiguration withMaxConnectionLifetimeMs(
+        ValueProvider<Long> maxConnectionLifetimeMs) {
+      return builder().setMaxConnectionLifetimeMs(maxConnectionLifetimeMs).build();
+    }
+
+    // Maximum Connection pool size
+    public DriverConfiguration withMaxConnectionPoolSize(int maxConnectionPoolSize) {
+      return withMaxConnectionPoolSize(ValueProvider.StaticValueProvider.of(maxConnectionPoolSize));
+    }
+
+    public DriverConfiguration withMaxConnectionPoolSize(
+        ValueProvider<Integer> maxConnectionPoolSize) {
+      return builder().setMaxConnectionPoolSize(maxConnectionPoolSize).build();
+    }
+
+    // Connection Acq Timeout
+    public DriverConfiguration withConnectionAcquisitionTimeoutMs(
+        long connectionAcquisitionTimeoutMs) {
+      return withConnectionAcquisitionTimeoutMs(
+          ValueProvider.StaticValueProvider.of(connectionAcquisitionTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionAcquisitionTimeoutMs(
+        ValueProvider<Long> connectionAcquisitionTimeoutMs) {
+      return builder().setConnectionAcquisitionTimeoutMs(connectionAcquisitionTimeoutMs).build();
+    }
+
+    // Connection Timeout
+    public DriverConfiguration withConnectionTimeoutMs(long connectionTimeoutMs) {
+      return withConnectionTimeoutMs(ValueProvider.StaticValueProvider.of(connectionTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionTimeoutMs(ValueProvider<Long> connectionTimeoutMs) {
+      return builder().setConnectionTimeoutMs(connectionTimeoutMs).build();
+    }
+
+    // Maximum Transaction Retry Time
+    public DriverConfiguration withMaxTransactionRetryTimeMs(long maxTransactionRetryTimeMs) {
+      return withMaxTransactionRetryTimeMs(
+          ValueProvider.StaticValueProvider.of(maxTransactionRetryTimeMs));
+    }
+
+    public DriverConfiguration withMaxTransactionRetryTimeMs(
+        ValueProvider<Long> maxTransactionRetryTimeMs) {
+      return builder().setMaxTransactionRetryTimeMs(maxTransactionRetryTimeMs).build();
+    }
+
+    public DriverConfiguration withUsername(String username) {
+      return withUsername(ValueProvider.StaticValueProvider.of(username));
+    }
+
+    public DriverConfiguration withUsername(ValueProvider<String> username) {
+      Preconditions.checkArgument(username != null, "neo4j username can not be null", username);
+      Preconditions.checkArgument(
+          username.get() != null, "neo4j username can not be null", username);
+      return builder().setUsername(username).build();
+    }
+
+    public DriverConfiguration withPassword(String password) {
+      return withPassword(ValueProvider.StaticValueProvider.of(password));
+    }
+
+    public DriverConfiguration withPassword(ValueProvider<String> password) {
+      Preconditions.checkArgument(password != null, "neo4j password can not be null", password);
+      Preconditions.checkArgument(
+          password.get() != null, "neo4j password can not be null", password);
+      return builder().setPassword(password).build();
+    }
+
+    // Encryption
+    public DriverConfiguration withRouting() {
+      return builder().setRouting(ValueProvider.StaticValueProvider.of(Boolean.TRUE)).build();
+    }
+
+    public DriverConfiguration withoutRouting() {
+      return builder().setRouting(ValueProvider.StaticValueProvider.of(Boolean.FALSE)).build();
+    }
+
+    void populateDisplayData(DisplayData.Builder builder) {
+      builder.addIfNotNull(DisplayData.item("neo4j-url", getUrl()));
+      builder.addIfNotNull(DisplayData.item("neo4j-username", getUsername()));
+      builder.addIfNotNull(
+          DisplayData.item(
+              "neo4j-password", getPassword() != null ? "<provided>" : "<not-provided>"));
+      builder.addIfNotNull(
+          DisplayData.item(
+              "neo4j-encryption", getEncryption() != null ? "<provided>" : "<not-provided>"));
+    }
+
+    Driver buildDriver() {
+      // Create the Neo4j Driver
+      // The default is: have the driver make the determination
+      //
+      Config.ConfigBuilder configBuilder = Config.builder();
+
+      if (getEncryption() != null && getEncryption().get() != null) {
+        if (getEncryption().get()) {
+          configBuilder =
+              Config.builder()
+                  .withEncryption()
+                  .withTrustStrategy(Config.TrustStrategy.trustAllCertificates());
+        } else {
+          configBuilder = Config.builder().withoutEncryption();
+        }
+      }
+
+      // physical layer
+      if (getConnectionLivenessCheckTimeoutMs() != null
+          && getConnectionLivenessCheckTimeoutMs().get() != null
+          && getConnectionLivenessCheckTimeoutMs().get() > 0) {
+        configBuilder =
+            configBuilder.withConnectionLivenessCheckTimeout(
+                getConnectionLivenessCheckTimeoutMs().get(), TimeUnit.MILLISECONDS);
+      }
+      if (getMaxConnectionLifetimeMs() != null
+          && getMaxConnectionLifetimeMs().get() != null
+          && getMaxConnectionLifetimeMs().get() > 0) {
+        configBuilder =
+            configBuilder.withMaxConnectionLifetime(
+                getMaxConnectionLifetimeMs().get(), TimeUnit.MILLISECONDS);
+      }
+      if (getMaxConnectionPoolSize() != null && getMaxConnectionPoolSize().get() > 0) {
+        configBuilder = configBuilder.withMaxConnectionPoolSize(getMaxConnectionPoolSize().get());
+      }
+      if (getConnectionAcquisitionTimeoutMs() != null
+          && getConnectionAcquisitionTimeoutMs().get() != null
+          && getConnectionAcquisitionTimeoutMs().get() > 0) {
+        configBuilder =
+            configBuilder.withConnectionAcquisitionTimeout(
+                getConnectionAcquisitionTimeoutMs().get(), TimeUnit.MILLISECONDS);
+      }
+      if (getConnectionTimeoutMs() != null
+          && getConnectionTimeoutMs().get() != null
+          && getConnectionTimeoutMs().get() > 0) {
+        configBuilder =
+            configBuilder.withConnectionTimeout(
+                getConnectionTimeoutMs().get(), TimeUnit.MILLISECONDS);
+      }
+      if (getMaxTransactionRetryTimeMs() != null
+          && getMaxTransactionRetryTimeMs().get() != null
+          && getMaxTransactionRetryTimeMs().get() > 0) {
+        configBuilder =
+            configBuilder.withMaxTransactionRetryTime(
+                getMaxTransactionRetryTimeMs().get(), TimeUnit.MILLISECONDS);
+      }
+
+      // Set sane Logging level
+      //
+      configBuilder = configBuilder.withLogging(Logging.javaUtilLogging(Level.WARNING));
+
+      // Now we have the configuration for the driver
+      //
+      Config config = configBuilder.build();
+
+      // Get the list of the URI to connect with
+      //
+      List<URI> uris = new ArrayList<>();
+      if (getUrl() != null && getUrl().get() != null) {
+        try {
+          uris.add(new URI(getUrl().get()));
+        } catch (URISyntaxException e) {
+          throw new RuntimeException("Error creating URI from URL '" + getUrl().get() + "'", e);
+        }
+      }
+      if (getUrls() != null && getUrls().get() != null) {
+        List<String> urls = getUrls().get();
+        for (String url : urls) {
+          try {
+            uris.add(new URI(url));
+          } catch (URISyntaxException e) {
+            throw new RuntimeException(
+                "Error creating URI '"
+                    + getUrl().get()
+                    + "' from a list of "
+                    + urls.size()
+                    + " URLs",
+                e);
+          }
+        }
+      }
+
+      checkArgument(
+          getUsername() != null && getUsername().get() != null,
+          "please provide a username to connect to Neo4j");
+      checkArgument(
+          getPassword() != null && getPassword().get() != null,
+          "please provide a password to connect to Neo4j");
+
+      Driver driver;
+      if (getRouting() != null && getRouting().get() != null && getRouting().get()) {
+        driver =
+            GraphDatabase.routingDriver(
+                uris, AuthTokens.basic(getUsername().get(), getPassword().get()), config);
+      } else {
+        // Just take the first URI that was provided
+        driver =
+            GraphDatabase.driver(
+                uris.get(0), AuthTokens.basic(getUsername().get(), getPassword().get()), config);
+      }
+
+      // Now we create a

Review comment:
       Nit: incomplete sentence

##########
File path: sdks/java/io/neo4j/src/test/java/org/apache/beam/sdk/io/neo4j/Neo4jTestUtil.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.neo4j;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+
+public class Neo4jTestUtil {
+
+  public static final String NEO4J_VERSION = "4.3.6";

Review comment:
       Should this match the 4.3.4 version set in build.gradle?

##########
File path: sdks/java/io/neo4j/src/main/java/org/apache/beam/sdk/io/neo4j/Neo4jIO.java
##########
@@ -0,0 +1,1346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.neo4j;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.harness.JvmInitializer;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Logging;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.TransactionConfig;
+import org.neo4j.driver.TransactionWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a Beam IO to read from, and write data to, Neo4j.
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <h3>Driver configuration</h3>
+ *
+ * <p>To read from or write to Neo4j you have to provide a {@link DriverConfiguration} using<br>
+ * 1. {@link DriverConfiguration#create()} (which must be {@link Serializable});<br>
+ * 2. or {@link DriverConfiguration#create(String, String, String)} (URL, username and password).
+ *
+ * <p>If you have trouble connecting to a Neo4j Aura database please try to disable a few security
+ * algorithms in your JVM. This makes sure that the right one is picked to connect:
+ *
+ * <p>
+ *
+ * <pre>{@code
+ * Security.setProperty(
+ *       "jdk.tls.disabledAlgorithms",
+ *       "SSLv3, RC4, DES, MD5withRSA, DH keySize < 1024, EC keySize < 224, 3DES_EDE_CBC, anon, NULL");
+ * }</pre>
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <p>To execute this code on GCP Dataflow you can create a class which extends {@link
+ * JvmInitializer} and implement the {@link JvmInitializer#onStartup()} method. You need to annotate
+ * this new class with {@link com.google.auto.service.AutoService}
+ *
+ * <pre>{@code
+ * @AutoService(value = JvmInitializer.class)
+ * }</pre>
+ *
+ * <p>
+ *
+ * <h3>Reading from Neo4j</h3>
+ *
+ * <p>{@link Neo4jIO#readAll()} source returns a bounded collection of {@code OuptutT} as a {@code
+ * PCollection<OutputT>}. OutputT is the type returned by the provided {@link RowMapper}. It accepts
+ * parameters as input in the form of {@code ParameterT} as a {@code PCollection<ParameterT>}
+ *
+ * <p>The following example reads ages to return the IDs of Person nodes. It runs a Cypher query for
+ * each provided age.
+ *
+ * <p>The mapping {@link SerializableFunction} maps input values to each execution of the Cypher
+ * statement. In the function simply return a map containing the parameters you want to set.
+ *
+ * <p>The {@link RowMapper} converts output Neo4j {@link Record} values to the output of the source.
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(Create.of(40, 50, 60))
+ *   .apply(Neo4jIO.<Integer, String>readAll()
+ *     .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *     .withCypher("MATCH(n:Person) WHERE n.age = $age RETURN n.id")
+ *     .withReadTransaction()
+ *     .withCoder(StringUtf8Coder.of())
+ *     .withParametersFunction( age -> Collections.singletonMap( "age", age ))
+ *     .withRowMapper( record -> return record.get(0).asString() )
+ *   );
+ * }</pre>
+ *
+ * <h3>Writing to Neo4j</h3>
+ *
+ * <p>Neo4j sink supports writing data to a graph. It writes a {@link PCollection} to the graph by
+ * collecting a batch of rows after which all rows in this batch are written together to Neo4j. This
+ * is done using the {@link WriteUnwind} sink transform.
+ *
+ * <p>Like the source, to configure the sink, you have to provide a {@link DriverConfiguration}.
+ *
+ * <p>In the following example we'll merge a collection of {@link org.apache.beam.sdk.values.Row}
+ * into Person nodes. Since this is a Sink it has no output and as such no RowMapper is needed. The
+ * rows are being used as a container for the parameters of the Cypher statement. The used Cypher in
+ * question needs to be an UNWIND statement. Like in the read case, the parameters {@link
+ * SerializableFunction} converts parameter values to a {@link Map<String, Object>}. The difference
+ * here is that the resulting Map is stored in a {@link List} (containing maps) which in turn is
+ * stored in another Map under the name provided by the {@link
+ * WriteUnwind#withUnwindMapName(String)} method. All of this is handled automatically. You do need
+ * to provide the unwind map name so that you can reference that in the UNWIND statement.
+ *
+ * <p>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(...)
+ *   .apply(Neo4jIO.<Row>writeUnwind()
+ *      .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *      .withUnwindMapName("rows")
+ *      .withCypher("UNWIND $rows AS row MERGE(n:Person { id : row.id } ) SET n.firstName = row.first, n.lastName = row.last")
+ *      .withParametersFunction( row -> ImmutableMap.of(
+ *        "id", row.getString("id),
+ *        "first", row.getString("firstName")
+ *        "last", row.getString("lastName")))
+ *    );
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class Neo4jIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Neo4jIO.class);
+
+  /**
+   * Read all rows using a Neo4j Cypher query.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   * @param <OutputT> Type of the data to be read.
+   */
+  public static <ParameterT, OutputT> ReadAll<ParameterT, OutputT> readAll() {
+    return new AutoValue_Neo4jIO_ReadAll.Builder<ParameterT, OutputT>()
+        .setFetchSize(ValueProvider.StaticValueProvider.of(Config.defaultConfig().fetchSize()))
+        .build();
+  }
+
+  /**
+   * Write all rows using a Neo4j Cypher UNWIND cypher statement. This sets a default batch batch
+   * size of 5000.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   */
+  public static <ParameterT> WriteUnwind<ParameterT> writeUnwind() {
+    return new AutoValue_Neo4jIO_WriteUnwind.Builder<ParameterT>()
+        .setBatchSize(ValueProvider.StaticValueProvider.of(5000L))
+        .build();
+  }
+
+  private static <ParameterT, OutputT> PCollection<OutputT> getOutputPCollection(
+      PCollection<ParameterT> input, DoFn<ParameterT, OutputT> writeFn, Coder<OutputT> coder) {
+    PCollection<OutputT> output = input.apply(ParDo.of(writeFn)).setCoder(coder);
+
+    try {
+      TypeDescriptor<OutputT> typeDesc = coder.getEncodedTypeDescriptor();
+      SchemaRegistry registry = input.getPipeline().getSchemaRegistry();
+      Schema schema = registry.getSchema(typeDesc);
+      output.setSchema(
+          schema,
+          typeDesc,
+          registry.getToRowFunction(typeDesc),
+          registry.getFromRowFunction(typeDesc));
+    } catch (NoSuchSchemaException e) {
+      // ignore
+    }
+    return output;
+  }
+
+  /**
+   * An interface used by {@link ReadAll} for converting each row of a Neo4j {@link Result} record
+   * {@link Record} into an element of the resulting {@link PCollection}.
+   */
+  @FunctionalInterface
+  public interface RowMapper<T> extends Serializable {
+    T mapRow(Record record) throws Exception;
+  }
+
+  /** This describes all the information needed to create a Neo4j {@link Session}. */
+  @AutoValue
+  public abstract static class DriverConfiguration implements Serializable {
+    public static DriverConfiguration create() {
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder().build();
+    }
+
+    public static DriverConfiguration create(String url, String username, String password) {
+      checkArgument(url != null, "url can not be null");
+      checkArgument(username != null, "username can not be null");
+      checkArgument(password != null, "password can not be null");
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder()
+          .build()
+          .withUrl(url)
+          .withUsername(username)
+          .withPassword(password);
+    }
+
+    abstract @Nullable ValueProvider<String> getUrl();
+
+    abstract @Nullable ValueProvider<List<String>> getUrls();
+
+    abstract @Nullable ValueProvider<String> getUsername();
+
+    abstract @Nullable ValueProvider<String> getPassword();
+
+    abstract @Nullable ValueProvider<Boolean> getEncryption();
+
+    abstract @Nullable ValueProvider<Long> getConnectionLivenessCheckTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getMaxConnectionLifetimeMs();
+
+    abstract @Nullable ValueProvider<Integer> getMaxConnectionPoolSize();
+
+    abstract @Nullable ValueProvider<Long> getConnectionAcquisitionTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getConnectionTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getMaxTransactionRetryTimeMs();
+
+    abstract @Nullable ValueProvider<Boolean> getRouting();
+
+    abstract Builder builder();
+
+    // URL
+    public DriverConfiguration withUrl(String url) {
+      return withUrl(ValueProvider.StaticValueProvider.of(url));
+    }
+
+    public DriverConfiguration withUrl(ValueProvider<String> url) {
+      Preconditions.checkArgument(
+          url != null, "a neo4j connection URL can not be empty or null", url);
+      Preconditions.checkArgument(
+          StringUtils.isNotEmpty(url.get()),
+          "a neo4j connection URL can not be empty or null",
+          url);
+      return builder().setUrl(url).build();
+    }
+
+    // URLS
+    public DriverConfiguration withUrls(List<String> urls) {
+      return withUrls(ValueProvider.StaticValueProvider.of(urls));
+    }
+
+    public DriverConfiguration withUrls(ValueProvider<List<String>> urls) {
+      Preconditions.checkArgument(
+          urls != null, "a list of neo4j connection URLs can not be empty or null", urls);
+      Preconditions.checkArgument(
+          urls.get() != null && !urls.get().isEmpty(),
+          "a neo4j connection URL can not be empty or null",
+          urls);
+      return builder().setUrls(urls).build();
+    }
+
+    // Encryption
+    public DriverConfiguration withEncryption() {
+      return builder().setEncryption(ValueProvider.StaticValueProvider.of(Boolean.TRUE)).build();
+    }
+
+    public DriverConfiguration withoutEncryption() {
+      return builder().setEncryption(ValueProvider.StaticValueProvider.of(Boolean.FALSE)).build();
+    }
+
+    // Connection Liveness Check Timout
+    public DriverConfiguration withConnectionLivenessCheckTimeoutMs(
+        long connectionLivenessCheckTimeoutMs) {
+      return withConnectionLivenessCheckTimeoutMs(
+          ValueProvider.StaticValueProvider.of(connectionLivenessCheckTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionLivenessCheckTimeoutMs(
+        ValueProvider<Long> connectionLivenessCheckTimeoutMs) {
+      return builder()
+          .setConnectionLivenessCheckTimeoutMs(connectionLivenessCheckTimeoutMs)
+          .build();
+    }
+
+    // Maximum Connection Lifetime
+    public DriverConfiguration withMaxConnectionLifetimeMs(long maxConnectionLifetimeMs) {
+      return withMaxConnectionLifetimeMs(
+          ValueProvider.StaticValueProvider.of(maxConnectionLifetimeMs));
+    }
+
+    public DriverConfiguration withMaxConnectionLifetimeMs(
+        ValueProvider<Long> maxConnectionLifetimeMs) {
+      return builder().setMaxConnectionLifetimeMs(maxConnectionLifetimeMs).build();
+    }
+
+    // Maximum Connection pool size
+    public DriverConfiguration withMaxConnectionPoolSize(int maxConnectionPoolSize) {
+      return withMaxConnectionPoolSize(ValueProvider.StaticValueProvider.of(maxConnectionPoolSize));
+    }
+
+    public DriverConfiguration withMaxConnectionPoolSize(
+        ValueProvider<Integer> maxConnectionPoolSize) {
+      return builder().setMaxConnectionPoolSize(maxConnectionPoolSize).build();
+    }
+
+    // Connection Acq Timeout
+    public DriverConfiguration withConnectionAcquisitionTimeoutMs(
+        long connectionAcquisitionTimeoutMs) {
+      return withConnectionAcquisitionTimeoutMs(
+          ValueProvider.StaticValueProvider.of(connectionAcquisitionTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionAcquisitionTimeoutMs(
+        ValueProvider<Long> connectionAcquisitionTimeoutMs) {
+      return builder().setConnectionAcquisitionTimeoutMs(connectionAcquisitionTimeoutMs).build();
+    }
+
+    // Connection Timeout
+    public DriverConfiguration withConnectionTimeoutMs(long connectionTimeoutMs) {
+      return withConnectionTimeoutMs(ValueProvider.StaticValueProvider.of(connectionTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionTimeoutMs(ValueProvider<Long> connectionTimeoutMs) {
+      return builder().setConnectionTimeoutMs(connectionTimeoutMs).build();
+    }
+
+    // Maximum Transaction Retry Time
+    public DriverConfiguration withMaxTransactionRetryTimeMs(long maxTransactionRetryTimeMs) {
+      return withMaxTransactionRetryTimeMs(
+          ValueProvider.StaticValueProvider.of(maxTransactionRetryTimeMs));
+    }
+
+    public DriverConfiguration withMaxTransactionRetryTimeMs(
+        ValueProvider<Long> maxTransactionRetryTimeMs) {
+      return builder().setMaxTransactionRetryTimeMs(maxTransactionRetryTimeMs).build();
+    }
+
+    public DriverConfiguration withUsername(String username) {
+      return withUsername(ValueProvider.StaticValueProvider.of(username));
+    }
+
+    public DriverConfiguration withUsername(ValueProvider<String> username) {
+      Preconditions.checkArgument(username != null, "neo4j username can not be null", username);
+      Preconditions.checkArgument(
+          username.get() != null, "neo4j username can not be null", username);
+      return builder().setUsername(username).build();
+    }
+
+    public DriverConfiguration withPassword(String password) {
+      return withPassword(ValueProvider.StaticValueProvider.of(password));
+    }
+
+    public DriverConfiguration withPassword(ValueProvider<String> password) {
+      Preconditions.checkArgument(password != null, "neo4j password can not be null", password);
+      Preconditions.checkArgument(
+          password.get() != null, "neo4j password can not be null", password);
+      return builder().setPassword(password).build();
+    }
+
+    // Encryption
+    public DriverConfiguration withRouting() {
+      return builder().setRouting(ValueProvider.StaticValueProvider.of(Boolean.TRUE)).build();
+    }
+
+    public DriverConfiguration withoutRouting() {
+      return builder().setRouting(ValueProvider.StaticValueProvider.of(Boolean.FALSE)).build();
+    }
+
+    void populateDisplayData(DisplayData.Builder builder) {
+      builder.addIfNotNull(DisplayData.item("neo4j-url", getUrl()));
+      builder.addIfNotNull(DisplayData.item("neo4j-username", getUsername()));
+      builder.addIfNotNull(
+          DisplayData.item(
+              "neo4j-password", getPassword() != null ? "<provided>" : "<not-provided>"));
+      builder.addIfNotNull(
+          DisplayData.item(
+              "neo4j-encryption", getEncryption() != null ? "<provided>" : "<not-provided>"));
+    }
+
+    Driver buildDriver() {
+      // Create the Neo4j Driver
+      // The default is: have the driver make the determination
+      //
+      Config.ConfigBuilder configBuilder = Config.builder();
+
+      if (getEncryption() != null && getEncryption().get() != null) {
+        if (getEncryption().get()) {
+          configBuilder =
+              Config.builder()
+                  .withEncryption()
+                  .withTrustStrategy(Config.TrustStrategy.trustAllCertificates());
+        } else {
+          configBuilder = Config.builder().withoutEncryption();
+        }
+      }
+
+      // physical layer
+      if (getConnectionLivenessCheckTimeoutMs() != null
+          && getConnectionLivenessCheckTimeoutMs().get() != null
+          && getConnectionLivenessCheckTimeoutMs().get() > 0) {
+        configBuilder =
+            configBuilder.withConnectionLivenessCheckTimeout(
+                getConnectionLivenessCheckTimeoutMs().get(), TimeUnit.MILLISECONDS);
+      }
+      if (getMaxConnectionLifetimeMs() != null
+          && getMaxConnectionLifetimeMs().get() != null
+          && getMaxConnectionLifetimeMs().get() > 0) {
+        configBuilder =
+            configBuilder.withMaxConnectionLifetime(
+                getMaxConnectionLifetimeMs().get(), TimeUnit.MILLISECONDS);
+      }
+      if (getMaxConnectionPoolSize() != null && getMaxConnectionPoolSize().get() > 0) {
+        configBuilder = configBuilder.withMaxConnectionPoolSize(getMaxConnectionPoolSize().get());
+      }
+      if (getConnectionAcquisitionTimeoutMs() != null
+          && getConnectionAcquisitionTimeoutMs().get() != null
+          && getConnectionAcquisitionTimeoutMs().get() > 0) {
+        configBuilder =
+            configBuilder.withConnectionAcquisitionTimeout(
+                getConnectionAcquisitionTimeoutMs().get(), TimeUnit.MILLISECONDS);
+      }
+      if (getConnectionTimeoutMs() != null
+          && getConnectionTimeoutMs().get() != null
+          && getConnectionTimeoutMs().get() > 0) {
+        configBuilder =
+            configBuilder.withConnectionTimeout(
+                getConnectionTimeoutMs().get(), TimeUnit.MILLISECONDS);
+      }
+      if (getMaxTransactionRetryTimeMs() != null
+          && getMaxTransactionRetryTimeMs().get() != null
+          && getMaxTransactionRetryTimeMs().get() > 0) {
+        configBuilder =
+            configBuilder.withMaxTransactionRetryTime(
+                getMaxTransactionRetryTimeMs().get(), TimeUnit.MILLISECONDS);
+      }
+
+      // Set sane Logging level
+      //
+      configBuilder = configBuilder.withLogging(Logging.javaUtilLogging(Level.WARNING));

Review comment:
       Is it worthwhile to make the logging level configurable? (see my other comment about `Config`)

##########
File path: sdks/java/io/neo4j/src/test/java/org/apache/beam/sdk/io/neo4j/Neo4jIOIT.java
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.neo4j;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.testcontainers.containers.Neo4jContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
+
+@RunWith(JUnit4.class)
+public class Neo4jIOIT {
+
+  private static Network network;
+  private static Neo4jContainer<?> neo4jContainer;
+
+  @Rule public transient TestPipeline parameterizedReadPipeline = TestPipeline.create();
+  @Rule public transient TestPipeline writeUnwindPipeline = TestPipeline.create();
+  @Rule public transient TestPipeline largeWriteUnwindPipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    // network sharing doesn't work with ClassRule
+    network = Network.newNetwork();
+
+    neo4jContainer =
+        new Neo4jContainer<>(DockerImageName.parse("neo4j").withTag(Neo4jTestUtil.NEO4J_VERSION))
+            .withStartupAttempts(1)
+            .withExposedPorts(7687, 7474)
+            .withNetwork(network)
+            .withEnv(
+                "NEO4J_AUTH", Neo4jTestUtil.NEO4J_USERNAME + "/" + Neo4jTestUtil.NEO4J_PASSWORD)
+            .withEnv("NEO4J_dbms_default_listen_address", "0.0.0.0")
+            .withNetworkAliases(Neo4jTestUtil.NEO4J_HOSTNAME)
+            .withSharedMemorySize(256 * 1024 * 1024L); // 256MB
+
+    // Start Neo4j
+    neo4jContainer.start();
+
+    // Start with an empty database to use for testing.
+    // This prevents any possibility of some old data messing up the test results.
+    // We add a unique constraint to see we're not trying to create nodes twice in the larger test
+    // below
+    //
+    Neo4jTestUtil.executeOnNeo4j(
+        "CREATE OR REPLACE DATABASE " + Neo4jTestUtil.NEO4J_DATABASE, false);
+    Neo4jTestUtil.executeOnNeo4j(
+        "CREATE CONSTRAINT something_id_unique ON (n:Something) ASSERT n.id IS UNIQUE", true);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    neo4jContainer.stop();
+    neo4jContainer.close();
+
+    try {
+      network.close();
+    } catch (Exception e) {
+      // ignore
+    }
+  }
+
+  @Test
+  public void testParameterizedRead() throws Exception {
+    PBegin begin = parameterizedReadPipeline.begin();
+    PCollection<String> stringsCollections =
+        begin.apply(Create.of(Arrays.asList("one", "two", "three")));
+
+    final Schema outputSchema =
+        Schema.of(
+            Schema.Field.of("One", Schema.FieldType.INT32),
+            Schema.Field.of("Str", Schema.FieldType.STRING));
+
+    SerializableFunction<String, Map<String, Object>> parametersFunction =
+        string -> Collections.singletonMap("par1", string);
+
+    Neo4jIO.RowMapper<Row> rowMapper =
+        record -> {
+          int one = record.get(0).asInt();
+          String string = record.get(1).asString();
+          return Row.withSchema(outputSchema).attachValues(one, string);
+        };
+
+    Neo4jIO.ReadAll<String, Row> read =
+        Neo4jIO.<String, Row>readAll()
+            .withCypher("RETURN 1, $par1")
+            .withDatabase(Neo4jTestUtil.NEO4J_DATABASE)
+            .withDriverConfiguration(Neo4jTestUtil.getDriverConfiguration())
+            .withReadTransaction()
+            .withFetchSize(5000)
+            .withRowMapper(rowMapper)
+            .withParametersFunction(parametersFunction)
+            .withCoder(SerializableCoder.of(Row.class))
+            .withCypherLogging();
+
+    PCollection<Row> outputRows = stringsCollections.apply(read);
+
+    PCollection<String> outputLines = outputRows.apply(ParDo.of(new RowToLineFn()));
+
+    PAssert.that(outputLines).containsInAnyOrder("1,one", "1,two", "1,three");
+
+    // Now run this pipeline
+    //
+    PipelineResult pipelineResult = parameterizedReadPipeline.run();
+
+    Assert.assertEquals(PipelineResult.State.DONE, pipelineResult.getState());
+  }
+
+  @Test
+  public void testWriteUnwind() throws Exception {
+    PBegin begin = writeUnwindPipeline.begin();
+    PCollection<String> stringsCollections =
+        begin.apply(Create.of(Arrays.asList("one", "two", "three")));
+
+    // Every row is represented by a Map<String, Object> in the parameters map.
+    // We accumulate the rows and 'unwind' those to Neo4j for performance reasons.
+    //
+    SerializableFunction<String, Map<String, Object>> parametersMapper =
+        name -> Collections.singletonMap("name", name);
+
+    Neo4jIO.WriteUnwind<String> read =
+        Neo4jIO.<String>writeUnwind()
+            .withDriverConfiguration(Neo4jTestUtil.getDriverConfiguration())
+            .withDatabase(Neo4jTestUtil.NEO4J_DATABASE)
+            .withBatchSize(5000)
+            .withUnwindMapName("rows")
+            .withCypher("UNWIND $rows AS row MERGE(n:Num { name : row.name })")
+            .withParametersFunction(parametersMapper)
+            .withCypherLogging();
+
+    stringsCollections.apply(read);
+
+    // Now run this pipeline
+    //
+    PipelineResult pipelineResult = writeUnwindPipeline.run();
+
+    Assert.assertEquals(PipelineResult.State.DONE, pipelineResult.getState());
+
+    // Connect back to the Instance and verify that we have 3 nodes
+    //
+    try (Driver driver = Neo4jTestUtil.getDriver()) {
+      try (Session session = Neo4jTestUtil.getSession(driver, true)) {
+        List<String> names =
+            session.readTransaction(
+                tx -> {
+                  List<String> list = new ArrayList<>();
+                  Result result = tx.run("MATCH(n:Num) RETURN n.name");
+                  while (result.hasNext()) {
+                    Record record = result.next();
+                    list.add(record.get(0).asString());
+                  }
+                  return list;
+                });
+        Assert.assertEquals(3, names.size());
+        Assert.assertTrue(names.contains("one"));
+        Assert.assertTrue(names.contains("two"));
+        Assert.assertTrue(names.contains("three"));
+      }
+    }
+  }
+
+  @Test
+  public void testLargeWriteUnwind() throws Exception {
+    PBegin begin = largeWriteUnwindPipeline.begin();
+
+    // Create 1000 IDs
+    List<Integer> idList = new ArrayList<>();
+    for (int i = 5000; i < 6000; i++) {
+      idList.add(i);
+    }
+    PCollection<Integer> idCollection = begin.apply(Create.of(idList));
+
+    // Every row is represented by a Map<String, Object> in the parameters map.
+    // We accumulate the rows and 'unwind' those to Neo4j for performance reasons.
+    //
+    SerializableFunction<Integer, Map<String, Object>> parametersFunction =
+        id -> ImmutableMap.of("id", id, "name", "Casters", "firstName", "Matt");
+
+    // 1000 rows with a batch size of 123 should trigger most scenarios we can think of
+    // We've put a unique constraint on Something.id
+    //
+    Neo4jIO.WriteUnwind<Integer> read =
+        Neo4jIO.<Integer>writeUnwind()
+            .withDriverConfiguration(Neo4jTestUtil.getDriverConfiguration())
+            .withBatchSize(123)
+            .withUnwindMapName("rows")
+            .withCypher("UNWIND $rows AS row CREATE(n:Something { id : row.id })")
+            .withDatabase(Neo4jTestUtil.NEO4J_DATABASE)
+            .withParametersFunction(parametersFunction)
+            .withCypherLogging();
+
+    idCollection.apply(read);
+
+    // Now run this pipeline
+    //
+    PipelineResult pipelineResult = largeWriteUnwindPipeline.run();
+
+    Assert.assertEquals(PipelineResult.State.DONE, pipelineResult.getState());
+
+    // Connect back to the Instance and verify that we have 1000 Something nodes
+    //
+    try (Driver driver = Neo4jTestUtil.getDriver()) {
+      try (Session session = Neo4jTestUtil.getSession(driver, true)) {
+        int[] values =
+            session.readTransaction(
+                tx -> {
+                  int[] v = new int[4];
+                  int nrRows = 0;
+                  Result result =
+                      tx.run("MATCH(n:Something) RETURN count(n), min(n.id), max(n.id)");
+                  while (result.hasNext()) {
+                    Record record = result.next();

Review comment:
       Nit: is it possible do the assertions directly inside this lambda?
   
   ```java
   Record record = result.hasNext();
   Assert.assertEquals(1000, record.get(0).asInt());
   ...
   Assert.assertFalse(result.hasNext());
   ```

##########
File path: sdks/java/io/neo4j/src/test/java/org/apache/beam/sdk/io/neo4j/Neo4jIOIT.java
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.neo4j;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.testcontainers.containers.Neo4jContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
+
+@RunWith(JUnit4.class)
+public class Neo4jIOIT {
+
+  private static Network network;
+  private static Neo4jContainer<?> neo4jContainer;
+
+  @Rule public transient TestPipeline parameterizedReadPipeline = TestPipeline.create();
+  @Rule public transient TestPipeline writeUnwindPipeline = TestPipeline.create();
+  @Rule public transient TestPipeline largeWriteUnwindPipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    // network sharing doesn't work with ClassRule
+    network = Network.newNetwork();
+
+    neo4jContainer =
+        new Neo4jContainer<>(DockerImageName.parse("neo4j").withTag(Neo4jTestUtil.NEO4J_VERSION))
+            .withStartupAttempts(1)
+            .withExposedPorts(7687, 7474)
+            .withNetwork(network)
+            .withEnv(
+                "NEO4J_AUTH", Neo4jTestUtil.NEO4J_USERNAME + "/" + Neo4jTestUtil.NEO4J_PASSWORD)
+            .withEnv("NEO4J_dbms_default_listen_address", "0.0.0.0")
+            .withNetworkAliases(Neo4jTestUtil.NEO4J_HOSTNAME)
+            .withSharedMemorySize(256 * 1024 * 1024L); // 256MB
+
+    // Start Neo4j
+    neo4jContainer.start();
+
+    // Start with an empty database to use for testing.
+    // This prevents any possibility of some old data messing up the test results.
+    // We add a unique constraint to see we're not trying to create nodes twice in the larger test
+    // below
+    //
+    Neo4jTestUtil.executeOnNeo4j(
+        "CREATE OR REPLACE DATABASE " + Neo4jTestUtil.NEO4J_DATABASE, false);
+    Neo4jTestUtil.executeOnNeo4j(
+        "CREATE CONSTRAINT something_id_unique ON (n:Something) ASSERT n.id IS UNIQUE", true);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    neo4jContainer.stop();
+    neo4jContainer.close();
+
+    try {
+      network.close();
+    } catch (Exception e) {
+      // ignore
+    }
+  }
+
+  @Test
+  public void testParameterizedRead() throws Exception {
+    PBegin begin = parameterizedReadPipeline.begin();
+    PCollection<String> stringsCollections =
+        begin.apply(Create.of(Arrays.asList("one", "two", "three")));

Review comment:
       Nit:
   
   ```java
       PCollection<String> stringsCollections =
           pipeline.apply(Create.of(Arrays.asList("one", "two", "three")));
   ```
   
   is exactly the same as 
   
   ```java
       PBegin begin = parameterizedReadPipeline.begin();
       PCollection<String> stringsCollections =
           begin.apply(Create.of(Arrays.asList("one", "two", "three")));
   ```
   for a fresh `pipeline`.

##########
File path: sdks/java/io/neo4j/src/main/java/org/apache/beam/sdk/io/neo4j/Neo4jIO.java
##########
@@ -0,0 +1,1346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.neo4j;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.harness.JvmInitializer;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Logging;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.TransactionConfig;
+import org.neo4j.driver.TransactionWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a Beam IO to read from, and write data to, Neo4j.
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <h3>Driver configuration</h3>
+ *
+ * <p>To read from or write to Neo4j you have to provide a {@link DriverConfiguration} using<br>
+ * 1. {@link DriverConfiguration#create()} (which must be {@link Serializable});<br>
+ * 2. or {@link DriverConfiguration#create(String, String, String)} (URL, username and password).
+ *
+ * <p>If you have trouble connecting to a Neo4j Aura database please try to disable a few security
+ * algorithms in your JVM. This makes sure that the right one is picked to connect:
+ *
+ * <p>
+ *
+ * <pre>{@code
+ * Security.setProperty(
+ *       "jdk.tls.disabledAlgorithms",
+ *       "SSLv3, RC4, DES, MD5withRSA, DH keySize < 1024, EC keySize < 224, 3DES_EDE_CBC, anon, NULL");
+ * }</pre>
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <p>To execute this code on GCP Dataflow you can create a class which extends {@link
+ * JvmInitializer} and implement the {@link JvmInitializer#onStartup()} method. You need to annotate
+ * this new class with {@link com.google.auto.service.AutoService}
+ *
+ * <pre>{@code
+ * @AutoService(value = JvmInitializer.class)
+ * }</pre>
+ *
+ * <p>
+ *
+ * <h3>Reading from Neo4j</h3>
+ *
+ * <p>{@link Neo4jIO#readAll()} source returns a bounded collection of {@code OuptutT} as a {@code
+ * PCollection<OutputT>}. OutputT is the type returned by the provided {@link RowMapper}. It accepts
+ * parameters as input in the form of {@code ParameterT} as a {@code PCollection<ParameterT>}
+ *
+ * <p>The following example reads ages to return the IDs of Person nodes. It runs a Cypher query for
+ * each provided age.
+ *
+ * <p>The mapping {@link SerializableFunction} maps input values to each execution of the Cypher
+ * statement. In the function simply return a map containing the parameters you want to set.
+ *
+ * <p>The {@link RowMapper} converts output Neo4j {@link Record} values to the output of the source.
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(Create.of(40, 50, 60))
+ *   .apply(Neo4jIO.<Integer, String>readAll()
+ *     .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *     .withCypher("MATCH(n:Person) WHERE n.age = $age RETURN n.id")
+ *     .withReadTransaction()
+ *     .withCoder(StringUtf8Coder.of())
+ *     .withParametersFunction( age -> Collections.singletonMap( "age", age ))
+ *     .withRowMapper( record -> return record.get(0).asString() )
+ *   );
+ * }</pre>
+ *
+ * <h3>Writing to Neo4j</h3>
+ *
+ * <p>Neo4j sink supports writing data to a graph. It writes a {@link PCollection} to the graph by
+ * collecting a batch of rows after which all rows in this batch are written together to Neo4j. This
+ * is done using the {@link WriteUnwind} sink transform.
+ *
+ * <p>Like the source, to configure the sink, you have to provide a {@link DriverConfiguration}.
+ *
+ * <p>In the following example we'll merge a collection of {@link org.apache.beam.sdk.values.Row}
+ * into Person nodes. Since this is a Sink it has no output and as such no RowMapper is needed. The
+ * rows are being used as a container for the parameters of the Cypher statement. The used Cypher in
+ * question needs to be an UNWIND statement. Like in the read case, the parameters {@link
+ * SerializableFunction} converts parameter values to a {@link Map<String, Object>}. The difference
+ * here is that the resulting Map is stored in a {@link List} (containing maps) which in turn is
+ * stored in another Map under the name provided by the {@link
+ * WriteUnwind#withUnwindMapName(String)} method. All of this is handled automatically. You do need
+ * to provide the unwind map name so that you can reference that in the UNWIND statement.
+ *
+ * <p>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(...)
+ *   .apply(Neo4jIO.<Row>writeUnwind()
+ *      .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *      .withUnwindMapName("rows")
+ *      .withCypher("UNWIND $rows AS row MERGE(n:Person { id : row.id } ) SET n.firstName = row.first, n.lastName = row.last")
+ *      .withParametersFunction( row -> ImmutableMap.of(
+ *        "id", row.getString("id),
+ *        "first", row.getString("firstName")
+ *        "last", row.getString("lastName")))
+ *    );
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class Neo4jIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Neo4jIO.class);
+
+  /**
+   * Read all rows using a Neo4j Cypher query.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   * @param <OutputT> Type of the data to be read.
+   */
+  public static <ParameterT, OutputT> ReadAll<ParameterT, OutputT> readAll() {
+    return new AutoValue_Neo4jIO_ReadAll.Builder<ParameterT, OutputT>()
+        .setFetchSize(ValueProvider.StaticValueProvider.of(Config.defaultConfig().fetchSize()))
+        .build();
+  }
+
+  /**
+   * Write all rows using a Neo4j Cypher UNWIND cypher statement. This sets a default batch batch
+   * size of 5000.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   */
+  public static <ParameterT> WriteUnwind<ParameterT> writeUnwind() {
+    return new AutoValue_Neo4jIO_WriteUnwind.Builder<ParameterT>()
+        .setBatchSize(ValueProvider.StaticValueProvider.of(5000L))
+        .build();
+  }
+
+  private static <ParameterT, OutputT> PCollection<OutputT> getOutputPCollection(
+      PCollection<ParameterT> input, DoFn<ParameterT, OutputT> writeFn, Coder<OutputT> coder) {
+    PCollection<OutputT> output = input.apply(ParDo.of(writeFn)).setCoder(coder);
+
+    try {
+      TypeDescriptor<OutputT> typeDesc = coder.getEncodedTypeDescriptor();
+      SchemaRegistry registry = input.getPipeline().getSchemaRegistry();
+      Schema schema = registry.getSchema(typeDesc);
+      output.setSchema(
+          schema,
+          typeDesc,
+          registry.getToRowFunction(typeDesc),
+          registry.getFromRowFunction(typeDesc));
+    } catch (NoSuchSchemaException e) {
+      // ignore
+    }
+    return output;
+  }
+
+  /**
+   * An interface used by {@link ReadAll} for converting each row of a Neo4j {@link Result} record
+   * {@link Record} into an element of the resulting {@link PCollection}.
+   */
+  @FunctionalInterface
+  public interface RowMapper<T> extends Serializable {
+    T mapRow(Record record) throws Exception;
+  }
+
+  /** This describes all the information needed to create a Neo4j {@link Session}. */
+  @AutoValue
+  public abstract static class DriverConfiguration implements Serializable {
+    public static DriverConfiguration create() {
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder().build();
+    }
+
+    public static DriverConfiguration create(String url, String username, String password) {
+      checkArgument(url != null, "url can not be null");
+      checkArgument(username != null, "username can not be null");
+      checkArgument(password != null, "password can not be null");
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder()
+          .build()
+          .withUrl(url)
+          .withUsername(username)
+          .withPassword(password);
+    }
+
+    abstract @Nullable ValueProvider<String> getUrl();
+
+    abstract @Nullable ValueProvider<List<String>> getUrls();
+
+    abstract @Nullable ValueProvider<String> getUsername();
+
+    abstract @Nullable ValueProvider<String> getPassword();
+
+    abstract @Nullable ValueProvider<Boolean> getEncryption();
+
+    abstract @Nullable ValueProvider<Long> getConnectionLivenessCheckTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getMaxConnectionLifetimeMs();
+
+    abstract @Nullable ValueProvider<Integer> getMaxConnectionPoolSize();
+
+    abstract @Nullable ValueProvider<Long> getConnectionAcquisitionTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getConnectionTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getMaxTransactionRetryTimeMs();
+
+    abstract @Nullable ValueProvider<Boolean> getRouting();
+
+    abstract Builder builder();
+
+    // URL
+    public DriverConfiguration withUrl(String url) {
+      return withUrl(ValueProvider.StaticValueProvider.of(url));
+    }
+
+    public DriverConfiguration withUrl(ValueProvider<String> url) {
+      Preconditions.checkArgument(
+          url != null, "a neo4j connection URL can not be empty or null", url);
+      Preconditions.checkArgument(
+          StringUtils.isNotEmpty(url.get()),
+          "a neo4j connection URL can not be empty or null",
+          url);
+      return builder().setUrl(url).build();
+    }
+
+    // URLS
+    public DriverConfiguration withUrls(List<String> urls) {
+      return withUrls(ValueProvider.StaticValueProvider.of(urls));
+    }
+
+    public DriverConfiguration withUrls(ValueProvider<List<String>> urls) {
+      Preconditions.checkArgument(
+          urls != null, "a list of neo4j connection URLs can not be empty or null", urls);
+      Preconditions.checkArgument(
+          urls.get() != null && !urls.get().isEmpty(),
+          "a neo4j connection URL can not be empty or null",
+          urls);
+      return builder().setUrls(urls).build();
+    }
+
+    // Encryption
+    public DriverConfiguration withEncryption() {
+      return builder().setEncryption(ValueProvider.StaticValueProvider.of(Boolean.TRUE)).build();
+    }
+
+    public DriverConfiguration withoutEncryption() {
+      return builder().setEncryption(ValueProvider.StaticValueProvider.of(Boolean.FALSE)).build();
+    }
+
+    // Connection Liveness Check Timout
+    public DriverConfiguration withConnectionLivenessCheckTimeoutMs(
+        long connectionLivenessCheckTimeoutMs) {
+      return withConnectionLivenessCheckTimeoutMs(
+          ValueProvider.StaticValueProvider.of(connectionLivenessCheckTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionLivenessCheckTimeoutMs(
+        ValueProvider<Long> connectionLivenessCheckTimeoutMs) {
+      return builder()
+          .setConnectionLivenessCheckTimeoutMs(connectionLivenessCheckTimeoutMs)
+          .build();
+    }
+
+    // Maximum Connection Lifetime
+    public DriverConfiguration withMaxConnectionLifetimeMs(long maxConnectionLifetimeMs) {
+      return withMaxConnectionLifetimeMs(
+          ValueProvider.StaticValueProvider.of(maxConnectionLifetimeMs));
+    }
+
+    public DriverConfiguration withMaxConnectionLifetimeMs(
+        ValueProvider<Long> maxConnectionLifetimeMs) {
+      return builder().setMaxConnectionLifetimeMs(maxConnectionLifetimeMs).build();
+    }
+
+    // Maximum Connection pool size
+    public DriverConfiguration withMaxConnectionPoolSize(int maxConnectionPoolSize) {
+      return withMaxConnectionPoolSize(ValueProvider.StaticValueProvider.of(maxConnectionPoolSize));
+    }
+
+    public DriverConfiguration withMaxConnectionPoolSize(
+        ValueProvider<Integer> maxConnectionPoolSize) {
+      return builder().setMaxConnectionPoolSize(maxConnectionPoolSize).build();
+    }
+
+    // Connection Acq Timeout
+    public DriverConfiguration withConnectionAcquisitionTimeoutMs(
+        long connectionAcquisitionTimeoutMs) {
+      return withConnectionAcquisitionTimeoutMs(
+          ValueProvider.StaticValueProvider.of(connectionAcquisitionTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionAcquisitionTimeoutMs(
+        ValueProvider<Long> connectionAcquisitionTimeoutMs) {
+      return builder().setConnectionAcquisitionTimeoutMs(connectionAcquisitionTimeoutMs).build();
+    }
+
+    // Connection Timeout
+    public DriverConfiguration withConnectionTimeoutMs(long connectionTimeoutMs) {
+      return withConnectionTimeoutMs(ValueProvider.StaticValueProvider.of(connectionTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionTimeoutMs(ValueProvider<Long> connectionTimeoutMs) {
+      return builder().setConnectionTimeoutMs(connectionTimeoutMs).build();
+    }
+
+    // Maximum Transaction Retry Time
+    public DriverConfiguration withMaxTransactionRetryTimeMs(long maxTransactionRetryTimeMs) {
+      return withMaxTransactionRetryTimeMs(
+          ValueProvider.StaticValueProvider.of(maxTransactionRetryTimeMs));
+    }
+
+    public DriverConfiguration withMaxTransactionRetryTimeMs(
+        ValueProvider<Long> maxTransactionRetryTimeMs) {
+      return builder().setMaxTransactionRetryTimeMs(maxTransactionRetryTimeMs).build();
+    }
+
+    public DriverConfiguration withUsername(String username) {
+      return withUsername(ValueProvider.StaticValueProvider.of(username));
+    }
+
+    public DriverConfiguration withUsername(ValueProvider<String> username) {
+      Preconditions.checkArgument(username != null, "neo4j username can not be null", username);
+      Preconditions.checkArgument(
+          username.get() != null, "neo4j username can not be null", username);
+      return builder().setUsername(username).build();
+    }
+
+    public DriverConfiguration withPassword(String password) {
+      return withPassword(ValueProvider.StaticValueProvider.of(password));
+    }
+
+    public DriverConfiguration withPassword(ValueProvider<String> password) {
+      Preconditions.checkArgument(password != null, "neo4j password can not be null", password);
+      Preconditions.checkArgument(
+          password.get() != null, "neo4j password can not be null", password);
+      return builder().setPassword(password).build();
+    }
+
+    // Encryption
+    public DriverConfiguration withRouting() {
+      return builder().setRouting(ValueProvider.StaticValueProvider.of(Boolean.TRUE)).build();
+    }
+
+    public DriverConfiguration withoutRouting() {
+      return builder().setRouting(ValueProvider.StaticValueProvider.of(Boolean.FALSE)).build();
+    }
+
+    void populateDisplayData(DisplayData.Builder builder) {
+      builder.addIfNotNull(DisplayData.item("neo4j-url", getUrl()));
+      builder.addIfNotNull(DisplayData.item("neo4j-username", getUsername()));
+      builder.addIfNotNull(
+          DisplayData.item(
+              "neo4j-password", getPassword() != null ? "<provided>" : "<not-provided>"));
+      builder.addIfNotNull(
+          DisplayData.item(
+              "neo4j-encryption", getEncryption() != null ? "<provided>" : "<not-provided>"));
+    }
+
+    Driver buildDriver() {
+      // Create the Neo4j Driver
+      // The default is: have the driver make the determination
+      //
+      Config.ConfigBuilder configBuilder = Config.builder();
+
+      if (getEncryption() != null && getEncryption().get() != null) {
+        if (getEncryption().get()) {
+          configBuilder =
+              Config.builder()
+                  .withEncryption()
+                  .withTrustStrategy(Config.TrustStrategy.trustAllCertificates());
+        } else {
+          configBuilder = Config.builder().withoutEncryption();
+        }
+      }
+
+      // physical layer
+      if (getConnectionLivenessCheckTimeoutMs() != null
+          && getConnectionLivenessCheckTimeoutMs().get() != null
+          && getConnectionLivenessCheckTimeoutMs().get() > 0) {
+        configBuilder =
+            configBuilder.withConnectionLivenessCheckTimeout(
+                getConnectionLivenessCheckTimeoutMs().get(), TimeUnit.MILLISECONDS);
+      }
+      if (getMaxConnectionLifetimeMs() != null
+          && getMaxConnectionLifetimeMs().get() != null
+          && getMaxConnectionLifetimeMs().get() > 0) {
+        configBuilder =
+            configBuilder.withMaxConnectionLifetime(
+                getMaxConnectionLifetimeMs().get(), TimeUnit.MILLISECONDS);
+      }
+      if (getMaxConnectionPoolSize() != null && getMaxConnectionPoolSize().get() > 0) {
+        configBuilder = configBuilder.withMaxConnectionPoolSize(getMaxConnectionPoolSize().get());
+      }
+      if (getConnectionAcquisitionTimeoutMs() != null
+          && getConnectionAcquisitionTimeoutMs().get() != null
+          && getConnectionAcquisitionTimeoutMs().get() > 0) {
+        configBuilder =
+            configBuilder.withConnectionAcquisitionTimeout(
+                getConnectionAcquisitionTimeoutMs().get(), TimeUnit.MILLISECONDS);
+      }
+      if (getConnectionTimeoutMs() != null
+          && getConnectionTimeoutMs().get() != null
+          && getConnectionTimeoutMs().get() > 0) {
+        configBuilder =
+            configBuilder.withConnectionTimeout(
+                getConnectionTimeoutMs().get(), TimeUnit.MILLISECONDS);
+      }
+      if (getMaxTransactionRetryTimeMs() != null
+          && getMaxTransactionRetryTimeMs().get() != null
+          && getMaxTransactionRetryTimeMs().get() > 0) {
+        configBuilder =
+            configBuilder.withMaxTransactionRetryTime(
+                getMaxTransactionRetryTimeMs().get(), TimeUnit.MILLISECONDS);
+      }
+
+      // Set sane Logging level
+      //
+      configBuilder = configBuilder.withLogging(Logging.javaUtilLogging(Level.WARNING));
+
+      // Now we have the configuration for the driver
+      //
+      Config config = configBuilder.build();
+
+      // Get the list of the URI to connect with
+      //
+      List<URI> uris = new ArrayList<>();
+      if (getUrl() != null && getUrl().get() != null) {
+        try {
+          uris.add(new URI(getUrl().get()));
+        } catch (URISyntaxException e) {
+          throw new RuntimeException("Error creating URI from URL '" + getUrl().get() + "'", e);
+        }
+      }
+      if (getUrls() != null && getUrls().get() != null) {
+        List<String> urls = getUrls().get();
+        for (String url : urls) {
+          try {
+            uris.add(new URI(url));
+          } catch (URISyntaxException e) {
+            throw new RuntimeException(
+                "Error creating URI '"
+                    + getUrl().get()
+                    + "' from a list of "
+                    + urls.size()
+                    + " URLs",
+                e);
+          }
+        }
+      }
+
+      checkArgument(
+          getUsername() != null && getUsername().get() != null,
+          "please provide a username to connect to Neo4j");
+      checkArgument(
+          getPassword() != null && getPassword().get() != null,
+          "please provide a password to connect to Neo4j");
+
+      Driver driver;
+      if (getRouting() != null && getRouting().get() != null && getRouting().get()) {
+        driver =
+            GraphDatabase.routingDriver(
+                uris, AuthTokens.basic(getUsername().get(), getPassword().get()), config);
+      } else {
+        // Just take the first URI that was provided
+        driver =
+            GraphDatabase.driver(
+                uris.get(0), AuthTokens.basic(getUsername().get(), getPassword().get()), config);
+      }
+
+      // Now we create a
+      return driver;
+    }
+
+    /**
+     * The Builder class below is not visible. We use it to service the "with" methods below the
+     * Builder class.
+     */
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setUrl(ValueProvider<String> url);
+
+      abstract Builder setUrls(ValueProvider<List<String>> url);
+
+      abstract Builder setUsername(ValueProvider<String> username);
+
+      abstract Builder setPassword(ValueProvider<String> password);
+
+      abstract Builder setEncryption(ValueProvider<Boolean> encryption);
+
+      abstract Builder setConnectionLivenessCheckTimeoutMs(
+          ValueProvider<Long> connectionLivenessCheckTimeoutMs);
+
+      abstract Builder setMaxConnectionLifetimeMs(ValueProvider<Long> maxConnectionLifetimeMs);
+
+      abstract Builder setMaxConnectionPoolSize(ValueProvider<Integer> maxConnectionPoolSize);
+
+      abstract Builder setConnectionAcquisitionTimeoutMs(
+          ValueProvider<Long> connectionAcquisitionTimeoutMs);
+
+      abstract Builder setConnectionTimeoutMs(ValueProvider<Long> connectionTimeoutMs);
+
+      abstract Builder setMaxTransactionRetryTimeMs(ValueProvider<Long> maxTransactionRetryTimeMs);
+
+      abstract Builder setRouting(ValueProvider<Boolean> routing);
+
+      abstract DriverConfiguration build();
+    }
+  }
+
+  /** This is the class which handles the work behind the {@link #readAll} method. */
+  @AutoValue
+  public abstract static class ReadAll<ParameterT, OutputT>
+      extends PTransform<PCollection<ParameterT>, PCollection<OutputT>> {
+
+    abstract @Nullable SerializableFunction<Void, Driver> getDriverProviderFn();
+
+    abstract @Nullable ValueProvider<String> getDatabase();
+
+    abstract @Nullable ValueProvider<String> getCypher();
+
+    abstract @Nullable ValueProvider<Boolean> getWriteTransaction();
+
+    abstract @Nullable ValueProvider<Long> getTransactionTimeoutMs();
+
+    abstract @Nullable RowMapper<OutputT> getRowMapper();
+
+    abstract @Nullable SerializableFunction<ParameterT, Map<String, Object>>
+        getParametersFunction();
+
+    abstract @Nullable Coder<OutputT> getCoder();
+
+    abstract @Nullable ValueProvider<Long> getFetchSize();
+
+    abstract @Nullable ValueProvider<Boolean> getLogCypher();
+
+    abstract Builder<ParameterT, OutputT> toBuilder();
+
+    // Driver configuration
+    public ReadAll<ParameterT, OutputT> withDriverConfiguration(DriverConfiguration config) {
+      return toBuilder()
+          .setDriverProviderFn(new DriverProviderFromDriverConfiguration(config))
+          .build();
+    }
+
+    // Cypher
+    public ReadAll<ParameterT, OutputT> withCypher(String cypher) {
+      checkArgument(
+          cypher != null, "Neo4jIO.readAll().withCypher(query) called with null cypher query");
+      return withCypher(ValueProvider.StaticValueProvider.of(cypher));
+    }
+
+    public ReadAll<ParameterT, OutputT> withCypher(ValueProvider<String> cypher) {
+      checkArgument(cypher != null, "Neo4jIO.readAll().withCypher(cypher) called with null cypher");
+      return toBuilder().setCypher(cypher).build();
+    }
+
+    // Transaction timeout
+    public ReadAll<ParameterT, OutputT> withTransactionTimeoutMs(long timeout) {
+      checkArgument(
+          timeout > 0,
+          "Neo4jIO.readAll().withTransactionTimeOutMs(timeout) called with timeout<=0");
+      return withTransactionTimeoutMs(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public ReadAll<ParameterT, OutputT> withTransactionTimeoutMs(ValueProvider<Long> timeout) {
+      checkArgument(
+          timeout != null && timeout.get() > 0,
+          "Neo4jIO.readAll().withTransactionTimeOutMs(timeout) called with timeout<=0");
+      return toBuilder().setTransactionTimeoutMs(timeout).build();
+    }
+
+    // Database
+    public ReadAll<ParameterT, OutputT> withDatabase(String database) {
+      checkArgument(
+          database != null, "Neo4jIO.readAll().withDatabase(database) called with null database");
+      return withDatabase(ValueProvider.StaticValueProvider.of(database));
+    }
+
+    public ReadAll<ParameterT, OutputT> withDatabase(ValueProvider<String> database) {
+      checkArgument(
+          database != null, "Neo4jIO.readAll().withDatabase(database) called with null database");
+      return toBuilder().setDatabase(database).build();
+    }
+
+    // Fetch size
+    public ReadAll<ParameterT, OutputT> withFetchSize(long fetchSize) {
+      checkArgument(
+          fetchSize > 0, "Neo4jIO.readAll().withFetchSize(query) called with fetchSize<=0");
+      return withFetchSize(ValueProvider.StaticValueProvider.of(fetchSize));
+    }
+
+    public ReadAll<ParameterT, OutputT> withFetchSize(ValueProvider<Long> fetchSize) {
+      checkArgument(
+          fetchSize != null && fetchSize.get() >= 0,
+          "Neo4jIO.readAll().withFetchSize(query) called with fetchSize<=0");
+      return toBuilder().setFetchSize(fetchSize).build();
+    }
+
+    // Row mapper
+    public ReadAll<ParameterT, OutputT> withRowMapper(RowMapper<OutputT> rowMapper) {
+      checkArgument(
+          rowMapper != null,
+          "Neo4jIO.readAll().withRowMapper(rowMapper) called with null rowMapper");
+      return toBuilder().setRowMapper(rowMapper).build();
+    }
+
+    // Parameters mapper
+    public ReadAll<ParameterT, OutputT> withParametersFunction(
+        SerializableFunction<ParameterT, Map<String, Object>> parametersFunction) {
+      checkArgument(
+          parametersFunction != null,
+          "Neo4jIO.readAll().withParametersFunction(parametersFunction) called with null parametersFunction");
+      return toBuilder().setParametersFunction(parametersFunction).build();
+    }
+
+    // Coder
+    public ReadAll<ParameterT, OutputT> withCoder(Coder<OutputT> coder) {
+      checkArgument(coder != null, "Neo4jIO.readAll().withCoder(coder) called with null coder");
+      return toBuilder().setCoder(coder).build();
+    }
+
+    // Read/Write transaction
+    public ReadAll<ParameterT, OutputT> withReadTransaction() {
+      return toBuilder()
+          .setWriteTransaction(ValueProvider.StaticValueProvider.of(Boolean.FALSE))
+          .build();
+    }
+
+    public ReadAll<ParameterT, OutputT> withWriteTransaction() {
+      return toBuilder()
+          .setWriteTransaction(ValueProvider.StaticValueProvider.of(Boolean.TRUE))
+          .build();
+    }
+
+    // Log cypher statements
+    public ReadAll<ParameterT, OutputT> withoutCypherLogging() {
+      return toBuilder().setLogCypher(ValueProvider.StaticValueProvider.of(Boolean.FALSE)).build();
+    }
+
+    public ReadAll<ParameterT, OutputT> withCypherLogging() {
+      return toBuilder().setLogCypher(ValueProvider.StaticValueProvider.of(Boolean.TRUE)).build();
+    }
+
+    @Override
+    public PCollection<OutputT> expand(PCollection<ParameterT> input) {
+
+      checkArgument(
+          getCypher() != null && getCypher().get() != null,
+          "please provide a cypher statement to execute");
+
+      long fetchSize;
+      if (getFetchSize() == null || getFetchSize().get() <= 0) {
+        fetchSize = 0;
+      } else {
+        fetchSize = getFetchSize().get();
+      }
+
+      String databaseName = null;
+      if (getDatabase() != null) {
+        databaseName = getDatabase().get();
+      }
+
+      boolean writeTransaction = false;
+      if (getWriteTransaction() != null) {
+        writeTransaction = getWriteTransaction().get();
+      }
+
+      long transactionTimeOutMs;
+      if (getTransactionTimeoutMs() == null || getTransactionTimeoutMs().get() < 0) {
+        transactionTimeOutMs = -1;
+      } else {
+        transactionTimeOutMs = getTransactionTimeoutMs().get();
+      }
+
+      boolean logCypher = false;
+      if (getLogCypher() != null) {
+        logCypher = getLogCypher().get();
+      }
+
+      ReadFn<ParameterT, OutputT> readFn =
+          new ReadFn<>(
+              getDriverProviderFn(),
+              getCypher().get(),
+              getRowMapper(),
+              getParametersFunction(),
+              fetchSize,
+              databaseName,
+              writeTransaction,
+              transactionTimeOutMs,
+              logCypher);
+
+      return getOutputPCollection(input, readFn, getCoder());
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("cypher", getCypher()));
+      builder.add(DisplayData.item("rowMapper", getRowMapper().getClass().getName()));
+      builder.add(DisplayData.item("coder", getCoder().getClass().getName()));
+      if (getDriverProviderFn() instanceof HasDisplayData) {
+        ((HasDisplayData) getDriverProviderFn()).populateDisplayData(builder);
+      }
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder<ParameterT, OutputT> {
+      abstract Builder<ParameterT, OutputT> setDriverProviderFn(
+          SerializableFunction<Void, Driver> driverProviderFn);
+
+      abstract Builder<ParameterT, OutputT> setCypher(ValueProvider<String> cypher);
+
+      abstract Builder<ParameterT, OutputT> setDatabase(ValueProvider<String> cypher);
+
+      abstract Builder<ParameterT, OutputT> setWriteTransaction(
+          ValueProvider<Boolean> writeTransaction);
+
+      abstract Builder<ParameterT, OutputT> setTransactionTimeoutMs(
+          ValueProvider<Long> transactionTimeoutMs);
+
+      abstract Builder<ParameterT, OutputT> setRowMapper(RowMapper<OutputT> rowMapper);
+
+      abstract Builder<ParameterT, OutputT> setParametersFunction(
+          SerializableFunction<ParameterT, Map<String, Object>> parametersFunction);
+
+      abstract Builder<ParameterT, OutputT> setCoder(Coder<OutputT> coder);
+
+      abstract Builder<ParameterT, OutputT> setFetchSize(ValueProvider<Long> fetchSize);
+
+      abstract Builder<ParameterT, OutputT> setLogCypher(ValueProvider<Boolean> logCypher);
+
+      abstract ReadAll<ParameterT, OutputT> build();
+    }
+  }
+
+  /** A {@link DoFn} to execute a Cypher query to read from Neo4j. */
+  private static class ReadWriteFn<ParameterT, OutputT> extends DoFn<ParameterT, OutputT> {
+    protected final SerializableFunction<Void, Driver> driverProviderFn;
+    protected final String databaseName;
+    protected final long fetchSize;
+    protected final long transactionTimeoutMs;
+
+    protected transient Driver driver;
+    protected transient Session session;
+    protected transient TransactionConfig transactionConfig;
+
+    private ReadWriteFn(
+        SerializableFunction<Void, Driver> driverProviderFn,
+        String databaseName,
+        long fetchSize,
+        long transactionTimeoutMs) {
+      this.driverProviderFn = driverProviderFn;
+      this.databaseName = databaseName;
+      this.fetchSize = fetchSize;
+      this.transactionTimeoutMs = transactionTimeoutMs;
+    }
+
+    /**
+     * Delay the creation of driver and session until we actually have data to do something with.
+     */
+    @Setup
+    public void setup() {}
+
+    protected void buildDriverAndSession() {
+      if (driver == null && session == null && transactionConfig == null) {
+        driver = driverProviderFn.apply(null);
+
+        SessionConfig.Builder builder = SessionConfig.builder();
+        if (databaseName != null) {
+          builder = builder.withDatabase(databaseName);
+        }
+        builder = builder.withFetchSize(fetchSize);
+        session = driver.session(builder.build());
+
+        TransactionConfig.Builder configBuilder = TransactionConfig.builder();
+        if (transactionTimeoutMs > 0) {
+          configBuilder = configBuilder.withTimeout(Duration.ofMillis(transactionTimeoutMs));
+        }
+        transactionConfig = configBuilder.build();
+      }
+    }
+
+    @FinishBundle
+    public void finishBundle() {
+      cleanUpDriverSession();
+    }
+
+    @Override
+    protected void finalize() {
+      cleanUpDriverSession();
+    }
+
+    protected void cleanUpDriverSession() {
+      if (session != null) {
+        try {
+          session.close();
+        } finally {
+          session = null;
+        }
+      }
+      if (driver != null) {
+        try {
+          driver.close();
+        } finally {
+          driver = null;
+        }
+      }
+    }
+
+    protected String getParametersString(Map<String, Object> parametersMap) {
+      StringBuilder parametersString = new StringBuilder();
+      parametersMap
+          .keySet()
+          .forEach(
+              key -> {
+                if (parametersString.length() > 0) {
+                  parametersString.append(',');
+                }
+                parametersString.append(key).append('=');
+                Object value = parametersMap.get(key);
+                if (value == null) {
+                  parametersString.append("<null>");
+                } else {
+                  parametersString.append(value);
+                }
+              });
+      return parametersString.toString();
+    }
+  }
+
+  /** A {@link DoFn} to execute a Cypher query to read from Neo4j. */
+  private static class ReadFn<ParameterT, OutputT> extends ReadWriteFn<ParameterT, OutputT> {
+    private final String cypher;
+    private final RowMapper<OutputT> rowMapper;
+    private final SerializableFunction<ParameterT, Map<String, Object>> parametersFunction;
+    private final boolean writeTransaction;
+    private final boolean logCypher;
+
+    private ReadFn(
+        SerializableFunction<Void, Driver> driverProviderFn,
+        String cypher,
+        RowMapper<OutputT> rowMapper,
+        SerializableFunction<ParameterT, Map<String, Object>> parametersFunction,
+        long fetchSize,
+        String databaseName,
+        boolean writeTransaction,
+        long transactionTimeoutMs,
+        boolean logCypher) {
+      super(driverProviderFn, databaseName, fetchSize, transactionTimeoutMs);
+      this.cypher = cypher;
+      this.rowMapper = rowMapper;
+      this.parametersFunction = parametersFunction;
+      this.writeTransaction = writeTransaction;
+      this.logCypher = logCypher;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      // Some initialization needs to take place as late as possible.
+      // If this method is not called (no input) then we don't need to create a session
+      //
+      if (session == null) {
+        buildDriverAndSession();
+      }
+
+      // Map the input data to the parameters map...
+      //
+      ParameterT parameters = context.element();
+      final Map<String, Object> parametersMap;
+      if (parametersFunction != null) {
+        parametersMap = parametersFunction.apply(parameters);
+      } else {
+        parametersMap = Collections.emptyMap();
+      }
+      executeReadCypherStatement(context, parametersMap);
+    }
+
+    private void executeReadCypherStatement(
+        final ProcessContext processContext, Map<String, Object> parametersMap) {
+      // The transaction.
+      //
+      TransactionWork<Void> transactionWork =
+          transaction -> {
+            Result result = transaction.run(cypher, parametersMap);
+            while (result.hasNext()) {
+              Record record = result.next();
+              try {
+                OutputT outputT = rowMapper.mapRow(record);
+                processContext.output(outputT);
+              } catch (Exception e) {
+                throw new RuntimeException("error mapping Neo4j record to row", e);
+              }
+            }
+
+            // No specific Neo4j transaction output beyond what goes to the output
+            return null;
+          };
+
+      if (logCypher) {
+        String parametersString = getParametersString(parametersMap);
+
+        String readWrite = writeTransaction ? "write" : "read";
+        LOG.info(
+            "Starting a "
+                + readWrite
+                + " transaction for cypher: "
+                + cypher
+                + ", parameters: "
+                + parametersString);
+      }
+
+      // There are 2 ways to do a transaction on Neo4j: read or write
+      //
+      if (writeTransaction) {
+        session.writeTransaction(transactionWork, transactionConfig);
+      } else {
+        session.readTransaction(transactionWork, transactionConfig);
+      }
+    }
+  }
+
+  /**
+   * Wraps a {@link DriverConfiguration} to provide a {@link Driver}.
+   *
+   * <p>At most a single {@link Driver} instance will be constructed during pipeline execution for
+   * each unique {@link DriverConfiguration} within the pipeline.
+   */
+  public static class DriverProviderFromDriverConfiguration
+      implements SerializableFunction<Void, Driver>, HasDisplayData {
+    private static final ConcurrentHashMap<DriverConfiguration, Driver> instances =
+        new ConcurrentHashMap<>();
+    private final DriverConfiguration config;
+
+    private DriverProviderFromDriverConfiguration(DriverConfiguration config) {
+      this.config = config;
+    }
+
+    public static SerializableFunction<Void, Driver> of(DriverConfiguration config) {
+      return new DriverProviderFromDriverConfiguration(config);
+    }
+
+    @Override
+    public Driver apply(Void input) {
+      return config.buildDriver();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      config.populateDisplayData(builder);
+    }
+  }
+
+  /** This is the class which handles the work behind the {@link #writeUnwind()} method. */
+  @AutoValue
+  public abstract static class WriteUnwind<ParameterT>
+      extends PTransform<PCollection<ParameterT>, PDone> {
+
+    abstract @Nullable SerializableFunction<Void, Driver> getDriverProviderFn();
+
+    abstract @Nullable ValueProvider<String> getDatabase();
+
+    abstract @Nullable ValueProvider<String> getCypher();
+
+    abstract @Nullable ValueProvider<String> getUnwindMapName();
+
+    abstract @Nullable ValueProvider<Long> getTransactionTimeoutMs();
+
+    abstract @Nullable SerializableFunction<ParameterT, Map<String, Object>>
+        getParametersFunction();
+
+    abstract @Nullable ValueProvider<Long> getBatchSize();
+
+    abstract @Nullable ValueProvider<Boolean> getLogCypher();
+
+    abstract Builder<ParameterT> toBuilder();
+
+    // Driver configuration
+    public WriteUnwind<ParameterT> withDriverConfiguration(DriverConfiguration config) {
+      return toBuilder()
+          .setDriverProviderFn(new DriverProviderFromDriverConfiguration(config))
+          .build();
+    }
+
+    // Cypher
+    public WriteUnwind<ParameterT> withCypher(String cypher) {
+      checkArgument(
+          cypher != null, "Neo4jIO.writeUnwind().withCypher(query) called with null cypher query");
+      return withCypher(ValueProvider.StaticValueProvider.of(cypher));
+    }
+
+    public WriteUnwind<ParameterT> withCypher(ValueProvider<String> cypher) {
+      checkArgument(
+          cypher != null, "Neo4jIO.writeUnwind().withCypher(cypher) called with null cypher");
+      return toBuilder().setCypher(cypher).build();
+    }
+
+    // UnwindMapName
+    public WriteUnwind<ParameterT> withUnwindMapName(String mapName) {
+      checkArgument(
+          mapName != null,
+          "Neo4jIO.writeUnwind().withUnwindMapName(query) called with null mapName");
+      return withUnwindMapName(ValueProvider.StaticValueProvider.of(mapName));
+    }
+
+    public WriteUnwind<ParameterT> withUnwindMapName(ValueProvider<String> mapName) {
+      checkArgument(
+          mapName != null,
+          "Neo4jIO.writeUnwind().withUnwindMapName(cypher) called with null mapName");
+      return toBuilder().setUnwindMapName(mapName).build();
+    }
+
+    // Transaction timeout
+    public WriteUnwind<ParameterT> withTransactionTimeoutMs(long timeout) {
+      checkArgument(
+          timeout > 0,
+          "Neo4jIO.writeUnwind().withTransactionTimeOutMs(timeout) called with timeout<=0");
+      return withTransactionTimeoutMs(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public WriteUnwind<ParameterT> withTransactionTimeoutMs(ValueProvider<Long> timeout) {
+      checkArgument(
+          timeout != null && timeout.get() > 0,
+          "Neo4jIO.writeUnwind().withTransactionTimeOutMs(timeout) called with timeout<=0");
+      return toBuilder().setTransactionTimeoutMs(timeout).build();
+    }
+
+    // Database
+    public WriteUnwind<ParameterT> withDatabase(String database) {
+      checkArgument(
+          database != null,
+          "Neo4jIO.writeUnwind().withDatabase(database) called with null database");
+      return withDatabase(ValueProvider.StaticValueProvider.of(database));
+    }
+
+    public WriteUnwind<ParameterT> withDatabase(ValueProvider<String> database) {
+      checkArgument(
+          database != null,
+          "Neo4jIO.writeUnwind().withDatabase(database) called with null database");
+      return toBuilder().setDatabase(database).build();
+    }
+
+    // Batch size
+    public WriteUnwind<ParameterT> withBatchSize(long batchSize) {
+      checkArgument(
+          batchSize > 0, "Neo4jIO.writeUnwind().withFetchSize(query) called with batchSize<=0");
+      return withBatchSize(ValueProvider.StaticValueProvider.of(batchSize));
+    }
+
+    public WriteUnwind<ParameterT> withBatchSize(ValueProvider<Long> batchSize) {
+      checkArgument(
+          batchSize != null && batchSize.get() >= 0,
+          "Neo4jIO.readAll().withBatchSize(query) called with batchSize<=0");
+      return toBuilder().setBatchSize(batchSize).build();
+    }
+
+    // Parameters mapper
+    public WriteUnwind<ParameterT> withParametersFunction(
+        SerializableFunction<ParameterT, Map<String, Object>> parametersFunction) {
+      checkArgument(
+          parametersFunction != null,
+          "Neo4jIO.readAll().withParametersFunction(parametersFunction) called with null parametersFunction");
+      return toBuilder().setParametersFunction(parametersFunction).build();
+    }
+
+    // Logging
+    public WriteUnwind<ParameterT> withCypherLogging() {
+      return toBuilder().setLogCypher(ValueProvider.StaticValueProvider.of(Boolean.TRUE)).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<ParameterT> input) {
+
+      checkArgument(
+          getCypher() != null && getCypher().get() != null,
+          "please provide an unwind cypher statement to execute");
+      checkArgument(
+          getUnwindMapName() != null && getUnwindMapName().get() != null,
+          "please provide an unwind map name");
+
+      String databaseName = null;
+      if (getDatabase() != null) {
+        databaseName = getDatabase().get();
+      }
+
+      String unwindMapName = null;
+      if (getUnwindMapName() != null) {
+        unwindMapName = getUnwindMapName().get();
+      }
+
+      long transactionTimeOutMs;
+      if (getTransactionTimeoutMs() == null || getTransactionTimeoutMs().get() < 0) {
+        transactionTimeOutMs = -1;
+      } else {
+        transactionTimeOutMs = getTransactionTimeoutMs().get();
+      }
+
+      long batchSize;
+      if (getBatchSize() == null || getBatchSize().get() <= 0) {
+        batchSize = 1;
+      } else {
+        batchSize = getBatchSize().get();
+      }
+
+      boolean logCypher = false;
+      if (getLogCypher() != null) {
+        logCypher = getLogCypher().get();
+      }
+
+      WriteUnwindFn<ParameterT> writeFn =
+          new WriteUnwindFn<>(
+              getDriverProviderFn(),
+              getCypher().get(),
+              getParametersFunction(),
+              -1,
+              databaseName,
+              transactionTimeOutMs,
+              batchSize,
+              logCypher,
+              unwindMapName);
+
+      input.apply(ParDo.of(writeFn));
+
+      return PDone.in(input.getPipeline());
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("cypher", getCypher()));
+      if (getDriverProviderFn() instanceof HasDisplayData) {
+        ((HasDisplayData) getDriverProviderFn()).populateDisplayData(builder);
+      }
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder<ParameterT> {
+      abstract Builder<ParameterT> setDriverProviderFn(
+          SerializableFunction<Void, Driver> driverProviderFn);
+
+      abstract Builder<ParameterT> setCypher(ValueProvider<String> cypher);
+
+      abstract Builder<ParameterT> setUnwindMapName(ValueProvider<String> unwindMapName);
+
+      abstract Builder<ParameterT> setDatabase(ValueProvider<String> database);
+
+      abstract Builder<ParameterT> setTransactionTimeoutMs(
+          ValueProvider<Long> transactionTimeoutMs);
+
+      abstract Builder<ParameterT> setParametersFunction(
+          SerializableFunction<ParameterT, Map<String, Object>> parametersFunction);
+
+      abstract Builder<ParameterT> setBatchSize(ValueProvider<Long> batchSize);
+
+      abstract Builder<ParameterT> setLogCypher(ValueProvider<Boolean> logCypher);
+
+      abstract WriteUnwind<ParameterT> build();
+    }
+  }
+
+  /** A {@link DoFn} to execute a Cypher query to read from Neo4j. */
+  private static class WriteUnwindFn<ParameterT> extends ReadWriteFn<ParameterT, Void> {
+
+    private final String cypher;
+    private final SerializableFunction<ParameterT, Map<String, Object>> parametersFunction;
+    private final boolean logCypher;
+    private final long batchSize;
+    private final String unwindMapName;
+
+    private long elementsInput;
+    private boolean loggingDone;
+    private List<Map<String, Object>> unwindList;
+
+    private WriteUnwindFn(
+        SerializableFunction<Void, Driver> driverProviderFn,
+        String cypher,
+        SerializableFunction<ParameterT, Map<String, Object>> parametersFunction,
+        long fetchSize,
+        String databaseName,
+        long transactionTimeoutMs,
+        long batchSize,
+        boolean logCypher,
+        String unwindMapName) {
+      super(driverProviderFn, databaseName, fetchSize, transactionTimeoutMs);
+      this.cypher = cypher;
+      this.parametersFunction = parametersFunction;
+      this.logCypher = logCypher;
+      this.batchSize = batchSize;
+      this.unwindMapName = unwindMapName;
+
+      unwindList = new ArrayList<>();
+
+      elementsInput = 0;
+      loggingDone = false;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      if (session == null) {
+        buildDriverAndSession();
+      }
+
+      // Map the input data to the parameters map...
+      //
+      ParameterT parameters = context.element();
+      if (parametersFunction != null) {
+        // Every input element creates a new Map<String,Object> entry in unwindList
+        //
+        unwindList.add(parametersFunction.apply(parameters));
+      } else {
+        // Someone is writing a bunch of static or procedurally generated values to Neo4j
+        unwindList.add(Collections.emptyMap());
+      }
+      elementsInput++;
+
+      if (elementsInput >= batchSize) {
+        // Execute the cypher query with the collected parameters map
+        //
+        executeCypherUnwindStatement();
+      }
+    }
+
+    private void executeCypherUnwindStatement() {
+      // Wait until the very last moment to connect to Neo4j

Review comment:
       Why? Wouldn't it be nice to the user to fail fast if there's going to be a connection problem, instead of waiting for a whole batch of elements to be queued to error out?

##########
File path: sdks/java/io/neo4j/src/main/java/org/apache/beam/sdk/io/neo4j/Neo4jIO.java
##########
@@ -0,0 +1,1346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.neo4j;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.harness.JvmInitializer;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Logging;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.TransactionConfig;
+import org.neo4j.driver.TransactionWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a Beam IO to read from, and write data to, Neo4j.
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <h3>Driver configuration</h3>
+ *
+ * <p>To read from or write to Neo4j you have to provide a {@link DriverConfiguration} using<br>
+ * 1. {@link DriverConfiguration#create()} (which must be {@link Serializable});<br>
+ * 2. or {@link DriverConfiguration#create(String, String, String)} (URL, username and password).
+ *
+ * <p>If you have trouble connecting to a Neo4j Aura database please try to disable a few security
+ * algorithms in your JVM. This makes sure that the right one is picked to connect:
+ *
+ * <p>
+ *
+ * <pre>{@code
+ * Security.setProperty(
+ *       "jdk.tls.disabledAlgorithms",
+ *       "SSLv3, RC4, DES, MD5withRSA, DH keySize < 1024, EC keySize < 224, 3DES_EDE_CBC, anon, NULL");
+ * }</pre>
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <p>To execute this code on GCP Dataflow you can create a class which extends {@link
+ * JvmInitializer} and implement the {@link JvmInitializer#onStartup()} method. You need to annotate
+ * this new class with {@link com.google.auto.service.AutoService}
+ *
+ * <pre>{@code
+ * @AutoService(value = JvmInitializer.class)
+ * }</pre>
+ *
+ * <p>
+ *
+ * <h3>Reading from Neo4j</h3>
+ *
+ * <p>{@link Neo4jIO#readAll()} source returns a bounded collection of {@code OuptutT} as a {@code
+ * PCollection<OutputT>}. OutputT is the type returned by the provided {@link RowMapper}. It accepts
+ * parameters as input in the form of {@code ParameterT} as a {@code PCollection<ParameterT>}
+ *
+ * <p>The following example reads ages to return the IDs of Person nodes. It runs a Cypher query for
+ * each provided age.
+ *
+ * <p>The mapping {@link SerializableFunction} maps input values to each execution of the Cypher
+ * statement. In the function simply return a map containing the parameters you want to set.
+ *
+ * <p>The {@link RowMapper} converts output Neo4j {@link Record} values to the output of the source.
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(Create.of(40, 50, 60))
+ *   .apply(Neo4jIO.<Integer, String>readAll()
+ *     .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *     .withCypher("MATCH(n:Person) WHERE n.age = $age RETURN n.id")
+ *     .withReadTransaction()
+ *     .withCoder(StringUtf8Coder.of())
+ *     .withParametersFunction( age -> Collections.singletonMap( "age", age ))
+ *     .withRowMapper( record -> return record.get(0).asString() )
+ *   );
+ * }</pre>
+ *
+ * <h3>Writing to Neo4j</h3>
+ *
+ * <p>Neo4j sink supports writing data to a graph. It writes a {@link PCollection} to the graph by
+ * collecting a batch of rows after which all rows in this batch are written together to Neo4j. This
+ * is done using the {@link WriteUnwind} sink transform.
+ *
+ * <p>Like the source, to configure the sink, you have to provide a {@link DriverConfiguration}.
+ *
+ * <p>In the following example we'll merge a collection of {@link org.apache.beam.sdk.values.Row}
+ * into Person nodes. Since this is a Sink it has no output and as such no RowMapper is needed. The
+ * rows are being used as a container for the parameters of the Cypher statement. The used Cypher in
+ * question needs to be an UNWIND statement. Like in the read case, the parameters {@link
+ * SerializableFunction} converts parameter values to a {@link Map<String, Object>}. The difference
+ * here is that the resulting Map is stored in a {@link List} (containing maps) which in turn is
+ * stored in another Map under the name provided by the {@link
+ * WriteUnwind#withUnwindMapName(String)} method. All of this is handled automatically. You do need
+ * to provide the unwind map name so that you can reference that in the UNWIND statement.
+ *
+ * <p>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(...)
+ *   .apply(Neo4jIO.<Row>writeUnwind()
+ *      .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *      .withUnwindMapName("rows")
+ *      .withCypher("UNWIND $rows AS row MERGE(n:Person { id : row.id } ) SET n.firstName = row.first, n.lastName = row.last")
+ *      .withParametersFunction( row -> ImmutableMap.of(
+ *        "id", row.getString("id),
+ *        "first", row.getString("firstName")
+ *        "last", row.getString("lastName")))
+ *    );
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class Neo4jIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Neo4jIO.class);
+
+  /**
+   * Read all rows using a Neo4j Cypher query.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   * @param <OutputT> Type of the data to be read.
+   */
+  public static <ParameterT, OutputT> ReadAll<ParameterT, OutputT> readAll() {
+    return new AutoValue_Neo4jIO_ReadAll.Builder<ParameterT, OutputT>()
+        .setFetchSize(ValueProvider.StaticValueProvider.of(Config.defaultConfig().fetchSize()))
+        .build();
+  }
+
+  /**
+   * Write all rows using a Neo4j Cypher UNWIND cypher statement. This sets a default batch batch
+   * size of 5000.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   */
+  public static <ParameterT> WriteUnwind<ParameterT> writeUnwind() {
+    return new AutoValue_Neo4jIO_WriteUnwind.Builder<ParameterT>()
+        .setBatchSize(ValueProvider.StaticValueProvider.of(5000L))
+        .build();
+  }
+
+  private static <ParameterT, OutputT> PCollection<OutputT> getOutputPCollection(
+      PCollection<ParameterT> input, DoFn<ParameterT, OutputT> writeFn, Coder<OutputT> coder) {
+    PCollection<OutputT> output = input.apply(ParDo.of(writeFn)).setCoder(coder);
+
+    try {
+      TypeDescriptor<OutputT> typeDesc = coder.getEncodedTypeDescriptor();
+      SchemaRegistry registry = input.getPipeline().getSchemaRegistry();
+      Schema schema = registry.getSchema(typeDesc);
+      output.setSchema(
+          schema,
+          typeDesc,
+          registry.getToRowFunction(typeDesc),
+          registry.getFromRowFunction(typeDesc));
+    } catch (NoSuchSchemaException e) {
+      // ignore
+    }
+    return output;
+  }
+
+  /**
+   * An interface used by {@link ReadAll} for converting each row of a Neo4j {@link Result} record
+   * {@link Record} into an element of the resulting {@link PCollection}.
+   */
+  @FunctionalInterface
+  public interface RowMapper<T> extends Serializable {
+    T mapRow(Record record) throws Exception;
+  }
+
+  /** This describes all the information needed to create a Neo4j {@link Session}. */
+  @AutoValue
+  public abstract static class DriverConfiguration implements Serializable {
+    public static DriverConfiguration create() {
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder().build();
+    }
+
+    public static DriverConfiguration create(String url, String username, String password) {
+      checkArgument(url != null, "url can not be null");
+      checkArgument(username != null, "username can not be null");
+      checkArgument(password != null, "password can not be null");
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder()
+          .build()
+          .withUrl(url)
+          .withUsername(username)
+          .withPassword(password);
+    }
+
+    abstract @Nullable ValueProvider<String> getUrl();

Review comment:
       Marking everything `@Nullable` seems contrary to our actual intention here, which is to have _none_ of these fields be null. (I saw you probably followed JdbcIO for this, so all of my criticism here applies to that class as well.)
   
   Beam does static nullness checking in the Java precommit, and the checker is complaining a lot about this PR because of AutoValue and Nullable, even though you did your due diligence and manually added null checks. https://gist.github.com/ibzib/9d646f58ed60446f4e90d5a579657971
   
   ```java
   getEncryption() != null && getEncryption().get() != null
   ```
   
   The problem with this is that `getEncryption()` is marked Nullable, and even if we check it's not null once, we can't prove to the checker that the second call will return the same result.
   
   On the other hand, if we remove Nullable, we are telling the checker that `getEncryption()` can _never_ be null. AutoValue will enforce that by automatically generating null checks within setter methods, which will also remove a lot of bloat. https://chromium.googlesource.com/external/github.com/google/auto/+/auto-value-1.6.2/value/CHANGES.md#1_3-1_4-functional-changes
   
   Finally, instead of using null as a placeholder for unset values, we should set reasonable default values up front in the `builder()` method. Instructions here: https://github.com/google/auto/blob/master/value/userguide/builders-howto.md#-specify-a-default-value-for-a-property

##########
File path: sdks/java/io/neo4j/src/main/java/org/apache/beam/sdk/io/neo4j/Neo4jIO.java
##########
@@ -0,0 +1,1346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.neo4j;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.harness.JvmInitializer;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Logging;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.TransactionConfig;
+import org.neo4j.driver.TransactionWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a Beam IO to read from, and write data to, Neo4j.
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <h3>Driver configuration</h3>
+ *
+ * <p>To read from or write to Neo4j you have to provide a {@link DriverConfiguration} using<br>
+ * 1. {@link DriverConfiguration#create()} (which must be {@link Serializable});<br>
+ * 2. or {@link DriverConfiguration#create(String, String, String)} (URL, username and password).
+ *
+ * <p>If you have trouble connecting to a Neo4j Aura database please try to disable a few security
+ * algorithms in your JVM. This makes sure that the right one is picked to connect:
+ *
+ * <p>
+ *
+ * <pre>{@code
+ * Security.setProperty(
+ *       "jdk.tls.disabledAlgorithms",
+ *       "SSLv3, RC4, DES, MD5withRSA, DH keySize < 1024, EC keySize < 224, 3DES_EDE_CBC, anon, NULL");
+ * }</pre>
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <p>To execute this code on GCP Dataflow you can create a class which extends {@link
+ * JvmInitializer} and implement the {@link JvmInitializer#onStartup()} method. You need to annotate
+ * this new class with {@link com.google.auto.service.AutoService}
+ *
+ * <pre>{@code
+ * @AutoService(value = JvmInitializer.class)
+ * }</pre>
+ *
+ * <p>
+ *
+ * <h3>Reading from Neo4j</h3>
+ *
+ * <p>{@link Neo4jIO#readAll()} source returns a bounded collection of {@code OuptutT} as a {@code
+ * PCollection<OutputT>}. OutputT is the type returned by the provided {@link RowMapper}. It accepts
+ * parameters as input in the form of {@code ParameterT} as a {@code PCollection<ParameterT>}
+ *
+ * <p>The following example reads ages to return the IDs of Person nodes. It runs a Cypher query for
+ * each provided age.
+ *
+ * <p>The mapping {@link SerializableFunction} maps input values to each execution of the Cypher
+ * statement. In the function simply return a map containing the parameters you want to set.
+ *
+ * <p>The {@link RowMapper} converts output Neo4j {@link Record} values to the output of the source.
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(Create.of(40, 50, 60))
+ *   .apply(Neo4jIO.<Integer, String>readAll()
+ *     .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *     .withCypher("MATCH(n:Person) WHERE n.age = $age RETURN n.id")
+ *     .withReadTransaction()
+ *     .withCoder(StringUtf8Coder.of())
+ *     .withParametersFunction( age -> Collections.singletonMap( "age", age ))
+ *     .withRowMapper( record -> return record.get(0).asString() )
+ *   );
+ * }</pre>
+ *
+ * <h3>Writing to Neo4j</h3>
+ *
+ * <p>Neo4j sink supports writing data to a graph. It writes a {@link PCollection} to the graph by
+ * collecting a batch of rows after which all rows in this batch are written together to Neo4j. This
+ * is done using the {@link WriteUnwind} sink transform.
+ *
+ * <p>Like the source, to configure the sink, you have to provide a {@link DriverConfiguration}.
+ *
+ * <p>In the following example we'll merge a collection of {@link org.apache.beam.sdk.values.Row}
+ * into Person nodes. Since this is a Sink it has no output and as such no RowMapper is needed. The
+ * rows are being used as a container for the parameters of the Cypher statement. The used Cypher in
+ * question needs to be an UNWIND statement. Like in the read case, the parameters {@link
+ * SerializableFunction} converts parameter values to a {@link Map<String, Object>}. The difference
+ * here is that the resulting Map is stored in a {@link List} (containing maps) which in turn is
+ * stored in another Map under the name provided by the {@link
+ * WriteUnwind#withUnwindMapName(String)} method. All of this is handled automatically. You do need
+ * to provide the unwind map name so that you can reference that in the UNWIND statement.
+ *
+ * <p>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(...)
+ *   .apply(Neo4jIO.<Row>writeUnwind()
+ *      .withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687", "neo4j", "password"))
+ *      .withUnwindMapName("rows")
+ *      .withCypher("UNWIND $rows AS row MERGE(n:Person { id : row.id } ) SET n.firstName = row.first, n.lastName = row.last")
+ *      .withParametersFunction( row -> ImmutableMap.of(
+ *        "id", row.getString("id),
+ *        "first", row.getString("firstName")
+ *        "last", row.getString("lastName")))
+ *    );
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class Neo4jIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Neo4jIO.class);
+
+  /**
+   * Read all rows using a Neo4j Cypher query.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   * @param <OutputT> Type of the data to be read.
+   */
+  public static <ParameterT, OutputT> ReadAll<ParameterT, OutputT> readAll() {
+    return new AutoValue_Neo4jIO_ReadAll.Builder<ParameterT, OutputT>()
+        .setFetchSize(ValueProvider.StaticValueProvider.of(Config.defaultConfig().fetchSize()))
+        .build();
+  }
+
+  /**
+   * Write all rows using a Neo4j Cypher UNWIND cypher statement. This sets a default batch batch
+   * size of 5000.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   */
+  public static <ParameterT> WriteUnwind<ParameterT> writeUnwind() {
+    return new AutoValue_Neo4jIO_WriteUnwind.Builder<ParameterT>()
+        .setBatchSize(ValueProvider.StaticValueProvider.of(5000L))
+        .build();
+  }
+
+  private static <ParameterT, OutputT> PCollection<OutputT> getOutputPCollection(
+      PCollection<ParameterT> input, DoFn<ParameterT, OutputT> writeFn, Coder<OutputT> coder) {
+    PCollection<OutputT> output = input.apply(ParDo.of(writeFn)).setCoder(coder);
+
+    try {
+      TypeDescriptor<OutputT> typeDesc = coder.getEncodedTypeDescriptor();
+      SchemaRegistry registry = input.getPipeline().getSchemaRegistry();
+      Schema schema = registry.getSchema(typeDesc);
+      output.setSchema(
+          schema,
+          typeDesc,
+          registry.getToRowFunction(typeDesc),
+          registry.getFromRowFunction(typeDesc));
+    } catch (NoSuchSchemaException e) {
+      // ignore
+    }
+    return output;
+  }
+
+  /**
+   * An interface used by {@link ReadAll} for converting each row of a Neo4j {@link Result} record
+   * {@link Record} into an element of the resulting {@link PCollection}.
+   */
+  @FunctionalInterface
+  public interface RowMapper<T> extends Serializable {
+    T mapRow(Record record) throws Exception;
+  }
+
+  /** This describes all the information needed to create a Neo4j {@link Session}. */
+  @AutoValue
+  public abstract static class DriverConfiguration implements Serializable {
+    public static DriverConfiguration create() {
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder().build();
+    }
+
+    public static DriverConfiguration create(String url, String username, String password) {
+      checkArgument(url != null, "url can not be null");
+      checkArgument(username != null, "username can not be null");
+      checkArgument(password != null, "password can not be null");
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder()
+          .build()
+          .withUrl(url)
+          .withUsername(username)
+          .withPassword(password);
+    }
+
+    abstract @Nullable ValueProvider<String> getUrl();
+
+    abstract @Nullable ValueProvider<List<String>> getUrls();
+
+    abstract @Nullable ValueProvider<String> getUsername();
+
+    abstract @Nullable ValueProvider<String> getPassword();
+
+    abstract @Nullable ValueProvider<Boolean> getEncryption();
+
+    abstract @Nullable ValueProvider<Long> getConnectionLivenessCheckTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getMaxConnectionLifetimeMs();
+
+    abstract @Nullable ValueProvider<Integer> getMaxConnectionPoolSize();
+
+    abstract @Nullable ValueProvider<Long> getConnectionAcquisitionTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getConnectionTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getMaxTransactionRetryTimeMs();
+
+    abstract @Nullable ValueProvider<Boolean> getRouting();
+
+    abstract Builder builder();
+
+    // URL
+    public DriverConfiguration withUrl(String url) {
+      return withUrl(ValueProvider.StaticValueProvider.of(url));
+    }
+
+    public DriverConfiguration withUrl(ValueProvider<String> url) {
+      Preconditions.checkArgument(
+          url != null, "a neo4j connection URL can not be empty or null", url);
+      Preconditions.checkArgument(
+          StringUtils.isNotEmpty(url.get()),
+          "a neo4j connection URL can not be empty or null",
+          url);
+      return builder().setUrl(url).build();
+    }
+
+    // URLS
+    public DriverConfiguration withUrls(List<String> urls) {
+      return withUrls(ValueProvider.StaticValueProvider.of(urls));
+    }
+
+    public DriverConfiguration withUrls(ValueProvider<List<String>> urls) {
+      Preconditions.checkArgument(
+          urls != null, "a list of neo4j connection URLs can not be empty or null", urls);
+      Preconditions.checkArgument(
+          urls.get() != null && !urls.get().isEmpty(),
+          "a neo4j connection URL can not be empty or null",
+          urls);
+      return builder().setUrls(urls).build();
+    }
+
+    // Encryption
+    public DriverConfiguration withEncryption() {
+      return builder().setEncryption(ValueProvider.StaticValueProvider.of(Boolean.TRUE)).build();
+    }
+
+    public DriverConfiguration withoutEncryption() {
+      return builder().setEncryption(ValueProvider.StaticValueProvider.of(Boolean.FALSE)).build();
+    }
+
+    // Connection Liveness Check Timout
+    public DriverConfiguration withConnectionLivenessCheckTimeoutMs(
+        long connectionLivenessCheckTimeoutMs) {
+      return withConnectionLivenessCheckTimeoutMs(
+          ValueProvider.StaticValueProvider.of(connectionLivenessCheckTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionLivenessCheckTimeoutMs(
+        ValueProvider<Long> connectionLivenessCheckTimeoutMs) {
+      return builder()
+          .setConnectionLivenessCheckTimeoutMs(connectionLivenessCheckTimeoutMs)
+          .build();
+    }
+
+    // Maximum Connection Lifetime
+    public DriverConfiguration withMaxConnectionLifetimeMs(long maxConnectionLifetimeMs) {
+      return withMaxConnectionLifetimeMs(
+          ValueProvider.StaticValueProvider.of(maxConnectionLifetimeMs));
+    }
+
+    public DriverConfiguration withMaxConnectionLifetimeMs(
+        ValueProvider<Long> maxConnectionLifetimeMs) {
+      return builder().setMaxConnectionLifetimeMs(maxConnectionLifetimeMs).build();
+    }
+
+    // Maximum Connection pool size
+    public DriverConfiguration withMaxConnectionPoolSize(int maxConnectionPoolSize) {
+      return withMaxConnectionPoolSize(ValueProvider.StaticValueProvider.of(maxConnectionPoolSize));
+    }
+
+    public DriverConfiguration withMaxConnectionPoolSize(
+        ValueProvider<Integer> maxConnectionPoolSize) {
+      return builder().setMaxConnectionPoolSize(maxConnectionPoolSize).build();
+    }
+
+    // Connection Acq Timeout
+    public DriverConfiguration withConnectionAcquisitionTimeoutMs(
+        long connectionAcquisitionTimeoutMs) {
+      return withConnectionAcquisitionTimeoutMs(
+          ValueProvider.StaticValueProvider.of(connectionAcquisitionTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionAcquisitionTimeoutMs(
+        ValueProvider<Long> connectionAcquisitionTimeoutMs) {
+      return builder().setConnectionAcquisitionTimeoutMs(connectionAcquisitionTimeoutMs).build();
+    }
+
+    // Connection Timeout
+    public DriverConfiguration withConnectionTimeoutMs(long connectionTimeoutMs) {
+      return withConnectionTimeoutMs(ValueProvider.StaticValueProvider.of(connectionTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionTimeoutMs(ValueProvider<Long> connectionTimeoutMs) {
+      return builder().setConnectionTimeoutMs(connectionTimeoutMs).build();
+    }
+
+    // Maximum Transaction Retry Time
+    public DriverConfiguration withMaxTransactionRetryTimeMs(long maxTransactionRetryTimeMs) {
+      return withMaxTransactionRetryTimeMs(
+          ValueProvider.StaticValueProvider.of(maxTransactionRetryTimeMs));
+    }
+
+    public DriverConfiguration withMaxTransactionRetryTimeMs(
+        ValueProvider<Long> maxTransactionRetryTimeMs) {
+      return builder().setMaxTransactionRetryTimeMs(maxTransactionRetryTimeMs).build();
+    }
+
+    public DriverConfiguration withUsername(String username) {
+      return withUsername(ValueProvider.StaticValueProvider.of(username));
+    }
+
+    public DriverConfiguration withUsername(ValueProvider<String> username) {
+      Preconditions.checkArgument(username != null, "neo4j username can not be null", username);
+      Preconditions.checkArgument(
+          username.get() != null, "neo4j username can not be null", username);
+      return builder().setUsername(username).build();
+    }
+
+    public DriverConfiguration withPassword(String password) {
+      return withPassword(ValueProvider.StaticValueProvider.of(password));
+    }
+
+    public DriverConfiguration withPassword(ValueProvider<String> password) {
+      Preconditions.checkArgument(password != null, "neo4j password can not be null", password);
+      Preconditions.checkArgument(
+          password.get() != null, "neo4j password can not be null", password);
+      return builder().setPassword(password).build();
+    }
+
+    // Encryption

Review comment:
       ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org