You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pd...@apache.org on 2020/11/11 15:03:44 UTC

[zeppelin] branch master updated: [ZEPPELIN-4948] Update Neo4j Driver

This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 183f849  [ZEPPELIN-4948] Update Neo4j Driver
183f849 is described below

commit 183f849e72c3f9ce8407beeffced2a56c7c0dca0
Author: Andrea Santurbano <sa...@gmail.com>
AuthorDate: Sun Jul 12 21:12:28 2020 +0200

    [ZEPPELIN-4948] Update Neo4j Driver
    
    ### What is this PR for?
    This PR adds the support to Neo4j 4.x
    
    ### What type of PR is it?
    [Improvement]
    
    ### Todos
    * [x] - Bumbed driver version
    * [x] - Added support to neo4j databases via `neo4j.database` property
    * [x] - Updated documentation
    * [x] - Switched tests to Testcontainer as the Neo4j test harness is packaged for Java 11
    * [x] - added new property `neo4j.multi.statement`
    
    ### What is the Jira issue?
    [ZEPPELIN-4948](https://issues.apache.org/jira/browse/ZEPPELIN-4948)
    
    ### How should this be tested?
    You can use the provided dataset into the test and run the same Cypher queries via the Zeppelin UI
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No (the driver has packages&class renaming but support the same Neo4j version)
    * Does this needs documentation? Yes (provided)
    
    Author: Andrea Santurbano <sa...@gmail.com>
    
    Closes #3851 from conker84/ZEPPELIN-4948 and squashes the following commits:
    
    e3f5a6fe0 [Andrea Santurbano] [ZEPPELIN-4948] Update Neo4j Driver
---
 docs/interpreter/neo4j.md                          |  10 ++
 neo4j/pom.xml                                      |  10 +-
 .../graph/neo4j/Neo4jConnectionManager.java        |  53 ++++++-----
 .../graph/neo4j/Neo4jCypherInterpreter.java        | 100 +++++++++++---------
 .../graph/neo4j/utils/Neo4jConversionUtils.java    |  44 +++++++--
 neo4j/src/main/resources/interpreter-setting.json  |  12 +++
 .../graph/neo4j/Neo4jCypherInterpreterTest.java    | 104 ++++++++++++++++-----
 pom.xml                                            |  16 ++++
 8 files changed, 242 insertions(+), 107 deletions(-)

diff --git a/docs/interpreter/neo4j.md b/docs/interpreter/neo4j.md
index eec9e07..a36e42a 100644
--- a/docs/interpreter/neo4j.md
+++ b/docs/interpreter/neo4j.md
@@ -45,6 +45,16 @@ The Neo4j Interpreter supports all Neo4j versions since v3 via the official [Neo
     <td>The Neo4j's BOLT url.</td>
   </tr>
   <tr>
+    <td>neo4j.database</td>
+    <td></td>
+    <td>The neo4j target database, if empty use the dafault db.</td>
+  </tr>
+  <tr>
+    <td>neo4j.multi.statement</td>
+    <td>true</td>
+    <td>Enables the multi statement management, if true it computes multiple queries separated by semicolon.</td>
+  </tr>
+  <tr>
     <td>neo4j.auth.type</td>
     <td>BASIC</td>
     <td>The Neo4j's authentication type (NONE, BASIC).</td>
diff --git a/neo4j/pom.xml b/neo4j/pom.xml
index 01d43ef..98007bf 100644
--- a/neo4j/pom.xml
+++ b/neo4j/pom.xml
@@ -34,9 +34,7 @@
   <name>Zeppelin: Neo4j interpreter</name>
   
   <properties>
-  	<neo4j.driver.version>1.7.1</neo4j.driver.version>
-  	<test.neo4j.kernel.version>3.4.10</test.neo4j.kernel.version>
-  	<neo4j.version>3.4.10</neo4j.version>
+  	<neo4j.driver.version>4.1.1</neo4j.driver.version>
   	<jackson.version>2.10.3</jackson.version>
     <interpreter.name>neo4j</interpreter.name>
   </properties>
@@ -65,10 +63,8 @@
     </dependency>
 
     <dependency>
-      <groupId>org.neo4j.test</groupId>
-      <artifactId>neo4j-harness</artifactId>
-      <version>${neo4j.version}</version>
-      <scope>test</scope>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>neo4j</artifactId>
     </dependency>
   </dependencies>
 
diff --git a/neo4j/src/main/java/org/apache/zeppelin/graph/neo4j/Neo4jConnectionManager.java b/neo4j/src/main/java/org/apache/zeppelin/graph/neo4j/Neo4jConnectionManager.java
index 472d31b..d2620d9 100644
--- a/neo4j/src/main/java/org/apache/zeppelin/graph/neo4j/Neo4jConnectionManager.java
+++ b/neo4j/src/main/java/org/apache/zeppelin/graph/neo4j/Neo4jConnectionManager.java
@@ -18,18 +18,21 @@
 package org.apache.zeppelin.graph.neo4j;
 
 import org.apache.commons.lang3.StringUtils;
-import org.neo4j.driver.v1.AuthToken;
-import org.neo4j.driver.v1.AuthTokens;
-import org.neo4j.driver.v1.Config;
-import org.neo4j.driver.v1.Driver;
-import org.neo4j.driver.v1.GraphDatabase;
-import org.neo4j.driver.v1.Session;
-import org.neo4j.driver.v1.StatementResult;
+import org.neo4j.driver.AuthToken;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.SessionConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -45,8 +48,9 @@ import org.apache.zeppelin.resource.ResourcePool;
  */
 public class Neo4jConnectionManager {
   static final Logger LOGGER = LoggerFactory.getLogger(Neo4jConnectionManager.class);
-  
+
   public static final String NEO4J_SERVER_URL = "neo4j.url";
+  public static final String NEO4J_DATABASE = "neo4j.database";
   public static final String NEO4J_AUTH_TYPE = "neo4j.auth.type";
   public static final String NEO4J_AUTH_USER = "neo4j.auth.user";
   public static final String NEO4J_AUTH_PASSWORD = "neo4j.auth.password";
@@ -66,6 +70,8 @@ public class Neo4jConnectionManager {
 
   private final AuthToken authToken;
 
+  private final String database;
+
   /**
    * Enum type for the AuthToken.
    */
@@ -73,20 +79,21 @@ public class Neo4jConnectionManager {
 
   public Neo4jConnectionManager(Properties properties) {
     this.neo4jUrl = properties.getProperty(NEO4J_SERVER_URL);
-    this.config = Config.build()
-          .withMaxIdleSessions(Integer.parseInt(properties.getProperty(NEO4J_MAX_CONCURRENCY)))
-          .toConfig();
+    this.config = Config.defaultConfig();
+    this.database = properties.getProperty(NEO4J_DATABASE);
+    this.authToken = initAuth(properties);
+  }
+
+  private AuthToken initAuth(Properties properties) {
     String authType = properties.getProperty(NEO4J_AUTH_TYPE);
     switch (Neo4jAuthType.valueOf(authType.toUpperCase())) {
       case BASIC:
         String username = properties.getProperty(NEO4J_AUTH_USER);
         String password = properties.getProperty(NEO4J_AUTH_PASSWORD);
-        this.authToken = AuthTokens.basic(username, password);
-        break;
+        return AuthTokens.basic(username, password);
       case NONE:
         LOGGER.debug("Creating NONE authentication");
-        this.authToken = AuthTokens.none();
-        break;
+        return AuthTokens.none();
       default:
         throw new RuntimeException("Neo4j authentication type not supported");
     }
@@ -108,11 +115,12 @@ public class Neo4jConnectionManager {
   }
 
   private Session getSession() {
-    return getDriver().session();
+    return getDriver().session(StringUtils.isNotEmpty(database) ?
+            SessionConfig.forDatabase(database) : SessionConfig.defaultConfig());
   }
 
-  public StatementResult execute(String cypherQuery,
-      InterpreterContext interpreterContext) {
+  public List<Record> execute(String cypherQuery,
+                              InterpreterContext interpreterContext) {
     Map<String, Object> params = new HashMap<>();
     if (interpreterContext != null) {
       ResourcePool resourcePool = interpreterContext.getResourcePool();
@@ -126,15 +134,14 @@ public class Neo4jConnectionManager {
       }
     }
     LOGGER.debug("Executing cypher query {} with params {}", cypherQuery, params);
-    StatementResult result;
     try (Session session = getSession()) {
-      result = params.isEmpty()
-            ? getSession().run(cypherQuery) : getSession().run(cypherQuery, params);
+      final Result result = params.isEmpty()
+              ? session.run(cypherQuery) : session.run(cypherQuery, params);
+      return result.list();
     }
-    return result;
   }
 
-  public StatementResult execute(String cypherQuery) {
+  public List<Record> execute(String cypherQuery) {
     return execute(cypherQuery, null);
   }
 
diff --git a/neo4j/src/main/java/org/apache/zeppelin/graph/neo4j/Neo4jCypherInterpreter.java b/neo4j/src/main/java/org/apache/zeppelin/graph/neo4j/Neo4jCypherInterpreter.java
index 1944943..b13ed9d 100644
--- a/neo4j/src/main/java/org/apache/zeppelin/graph/neo4j/Neo4jCypherInterpreter.java
+++ b/neo4j/src/main/java/org/apache/zeppelin/graph/neo4j/Neo4jCypherInterpreter.java
@@ -20,28 +20,25 @@ package org.apache.zeppelin.graph.neo4j;
 import org.apache.commons.lang3.StringUtils;
 import org.neo4j.driver.internal.types.InternalTypeSystem;
 import org.neo4j.driver.internal.util.Iterables;
-import org.neo4j.driver.v1.Record;
-import org.neo4j.driver.v1.StatementResult;
-import org.neo4j.driver.v1.Value;
-import org.neo4j.driver.v1.types.Node;
-import org.neo4j.driver.v1.types.Relationship;
-import org.neo4j.driver.v1.types.TypeSystem;
-import org.neo4j.driver.v1.util.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Value;
+import org.neo4j.driver.types.Node;
+import org.neo4j.driver.types.Relationship;
+import org.neo4j.driver.types.TypeSystem;
+import org.neo4j.driver.util.Pair;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 import org.apache.zeppelin.graph.neo4j.utils.Neo4jConversionUtils;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -70,13 +67,26 @@ public class Neo4jCypherInterpreter extends Interpreter {
 
   private final Neo4jConnectionManager neo4jConnectionManager;
 
-  private final ObjectMapper jsonMapper = new ObjectMapper();
+  private final boolean isMultiStatementEnabled;
+
+  public static final String NEO4J_MULTI_STATEMENT = "neo4j.multi.statement";
 
   public Neo4jCypherInterpreter(Properties properties) {
     super(properties);
+    boolean isMultiStatementEnabled = isMultiStatementEnabled(properties);
+    this.isMultiStatementEnabled = isMultiStatementEnabled;
     this.neo4jConnectionManager = new Neo4jConnectionManager(properties);
   }
 
+  private boolean isMultiStatementEnabled(Properties properties) {
+    try {
+      return Boolean.parseBoolean(properties
+              .getProperty(NEO4J_MULTI_STATEMENT, "true"));
+    } catch (Exception ignored) {
+      return true;
+    }
+  }
+
   @Override
   public void open() {
     this.neo4jConnectionManager.open();
@@ -90,9 +100,10 @@ public class Neo4jCypherInterpreter extends Interpreter {
   public Map<String, String> getLabels(boolean refresh) {
     if (labels == null || refresh) {
       Map<String, String> old = labels == null ?
-          new LinkedHashMap<String, String>() : new LinkedHashMap<>(labels);
+              new LinkedHashMap<>() : new LinkedHashMap<>(labels);
       labels = new LinkedHashMap<>();
-      StatementResult result = this.neo4jConnectionManager.execute("CALL db.labels()");
+      Iterator<Record> result = this.neo4jConnectionManager.execute("CALL db.labels()")
+              .iterator();
       Set<String> colors = new HashSet<>();
       while (result.hasNext()) {
         Record record = result.next();
@@ -111,7 +122,8 @@ public class Neo4jCypherInterpreter extends Interpreter {
   private Set<String> getTypes(boolean refresh) {
     if (types == null || refresh) {
       types = new HashSet<>();
-      StatementResult result = this.neo4jConnectionManager.execute("CALL db.relationshipTypes()");
+      Iterator<Record> result = this.neo4jConnectionManager.execute("CALL db.relationshipTypes()")
+              .iterator();
       while (result.hasNext()) {
         Record record = result.next();
         types.add(record.get("relationshipType").asString());
@@ -126,9 +138,28 @@ public class Neo4jCypherInterpreter extends Interpreter {
     if (StringUtils.isBlank(cypherQuery)) {
       return new InterpreterResult(Code.SUCCESS);
     }
+    final List<String> queries = isMultiStatementEnabled ?
+            Arrays.asList(cypherQuery.split(";[^'|^\"|^(\\w+`)]")) : Arrays.asList(cypherQuery);
+    if (queries.size() == 1) {
+      final String query = queries.get(0);
+      return runQuery(query, interpreterContext);
+    } else {
+      final int lastIndex = queries.size() - 1;
+      final List<String> subQueries = queries.subList(0, lastIndex);
+      for (String query : subQueries) {
+        runQuery(query, interpreterContext);
+      }
+      return runQuery(queries.get(lastIndex), interpreterContext);
+    }
+  }
+
+  private InterpreterResult runQuery(String cypherQuery, InterpreterContext interpreterContext) {
+    if (StringUtils.isBlank(cypherQuery)) {
+      return new InterpreterResult(Code.SUCCESS);
+    }
     try {
-      StatementResult result = this.neo4jConnectionManager.execute(cypherQuery,
-              interpreterContext);
+      Iterator<Record> result = this.neo4jConnectionManager.execute(cypherQuery,
+              interpreterContext).iterator();
       Set<Node> nodes = new HashSet<>();
       Set<Relationship> relationships = new HashSet<>();
       List<String> columns = new ArrayList<>();
@@ -166,14 +197,14 @@ public class Neo4jCypherInterpreter extends Interpreter {
   }
 
   private void setTabularResult(String key, Object obj, List<String> columns, List<String> line,
-      TypeSystem typeSystem) {
+                                TypeSystem typeSystem) {
     if (obj instanceof Value) {
       Value value = (Value) obj;
       if (value.hasType(typeSystem.MAP())) {
         Map<String, Object> map = value.asMap();
         for (Entry<String, Object> entry : map.entrySet()) {
           setTabularResult(String.format(MAP_KEY_TEMPLATE, key, entry.getKey()), entry.getValue(),
-                columns, line, typeSystem);
+                  columns, line, typeSystem);
         }
       } else {
         addValueToLine(key, columns, line, value);
@@ -201,30 +232,11 @@ public class Neo4jCypherInterpreter extends Interpreter {
     }
     if (value != null) {
       if (value instanceof Value) {
-        Value val = (Value) value;
-        if (val.hasType(InternalTypeSystem.TYPE_SYSTEM.LIST())) {
-          value = val.asList();
-        } else if (val.hasType(InternalTypeSystem.TYPE_SYSTEM.MAP())) {
-          value = val.asMap();
-        } else if (val.hasType(InternalTypeSystem.TYPE_SYSTEM.POINT())) {
-          value = val.asPoint();
-        } else if (val.hasType(InternalTypeSystem.TYPE_SYSTEM.DATE())) {
-          value = val.asLocalDate();
-        } else if (val.hasType(InternalTypeSystem.TYPE_SYSTEM.TIME())) {
-          value = val.asOffsetTime();
-        } else if (val.hasType(InternalTypeSystem.TYPE_SYSTEM.LOCAL_TIME())) {
-          value = val.asLocalTime();
-        } else if (val.hasType(InternalTypeSystem.TYPE_SYSTEM.LOCAL_DATE_TIME())) {
-          value = val.asLocalDateTime();
-        } else if (val.hasType(InternalTypeSystem.TYPE_SYSTEM.DATE_TIME())) {
-          value = val.asZonedDateTime();
-        } else if (val.hasType(InternalTypeSystem.TYPE_SYSTEM.DURATION())) {
-          value = val.asIsoDuration();
-        }
+        value = Neo4jConversionUtils.convertValue((Value) value);
       }
-      if (value instanceof Collection) {
+      if (value instanceof Collection || value instanceof Map) {
         try {
-          value = jsonMapper.writer().writeValueAsString(value);
+          value = Neo4jConversionUtils.JSON_MAPPER.writer().writeValueAsString(value);
         } catch (Exception e) {
           LOGGER.debug("ignored exception: " + e.getMessage());
         }
@@ -269,14 +281,14 @@ public class Neo4jCypherInterpreter extends Interpreter {
       nodesList.add(Neo4jConversionUtils.toZeppelinNode(node, labels));
     }
     return new GraphResult(Code.SUCCESS,
-        new GraphResult.Graph(nodesList, relsList, labels, getTypes(true), true));
+            new GraphResult.Graph(nodesList, relsList, labels, getTypes(true), true));
   }
 
   @Override
   public Scheduler getScheduler() {
     return SchedulerFactory.singleton()
-        .createOrGetParallelScheduler(Neo4jCypherInterpreter.class.getName() + this.hashCode(),
-            Integer.parseInt(getProperty(Neo4jConnectionManager.NEO4J_MAX_CONCURRENCY)));
+            .createOrGetParallelScheduler(Neo4jCypherInterpreter.class.getName() + this.hashCode(),
+                    Integer.parseInt(getProperty(Neo4jConnectionManager.NEO4J_MAX_CONCURRENCY)));
   }
 
   @Override
diff --git a/neo4j/src/main/java/org/apache/zeppelin/graph/neo4j/utils/Neo4jConversionUtils.java b/neo4j/src/main/java/org/apache/zeppelin/graph/neo4j/utils/Neo4jConversionUtils.java
index 571afa9..514ffd5 100644
--- a/neo4j/src/main/java/org/apache/zeppelin/graph/neo4j/utils/Neo4jConversionUtils.java
+++ b/neo4j/src/main/java/org/apache/zeppelin/graph/neo4j/utils/Neo4jConversionUtils.java
@@ -17,8 +17,12 @@
 
 package org.apache.zeppelin.graph.neo4j.utils;
 
-import org.neo4j.driver.v1.types.Node;
-import org.neo4j.driver.v1.types.Relationship;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.neo4j.driver.Value;
+import org.neo4j.driver.internal.types.InternalTypeSystem;
+import org.neo4j.driver.types.Node;
+import org.neo4j.driver.types.Relationship;
 
 import java.util.LinkedHashSet;
 import java.util.Map;
@@ -29,13 +33,15 @@ import java.util.Set;
  */
 public class Neo4jConversionUtils {
   private Neo4jConversionUtils() {}
-  
+
+  public static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+
   private static final String[] LETTERS = "0123456789ABCDEF".split("");
 
   public static final String COLOR_GREY = "#D3D3D3";
-  
+
   public static org.apache.zeppelin.tabledata.Node toZeppelinNode(Node n,
-      Map<String, String> graphLabels) {
+                                                                  Map<String, String> graphLabels) {
     Set<String> labels = new LinkedHashSet<>();
     String firstLabel = null;
     for (String label : n.labels()) {
@@ -44,13 +50,31 @@ public class Neo4jConversionUtils {
       }
       labels.add(label);
     }
-    return new org.apache.zeppelin.tabledata.Node(n.id(), n.asMap(),
-        labels);
+    return new org.apache.zeppelin.tabledata.Node(n.id(),
+            n.asMap(Neo4jConversionUtils::convertValue),
+            labels);
+  }
+
+  public static String convertValue(Value val) {
+    if (val == null) return null;
+    try {
+      final String value;
+      if (val.hasType(InternalTypeSystem.TYPE_SYSTEM.LIST())
+              || val.hasType(InternalTypeSystem.TYPE_SYSTEM.MAP())) {
+        value = JSON_MAPPER.writeValueAsString(val.asObject());
+      } else {
+        value = val.asObject().toString();
+      }
+      return value;
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
   }
-  
+
   public static org.apache.zeppelin.tabledata.Relationship toZeppelinRelationship(Relationship r) {
-    return new org.apache.zeppelin.tabledata.Relationship(r.id(), r.asMap(),
-        r.startNodeId(), r.endNodeId(), r.type());
+    return new org.apache.zeppelin.tabledata.Relationship(r.id(),
+            r.asMap(Neo4jConversionUtils::convertValue),
+            r.startNodeId(), r.endNodeId(), r.type());
   }
 
   public static String getRandomLabelColor() {
diff --git a/neo4j/src/main/resources/interpreter-setting.json b/neo4j/src/main/resources/interpreter-setting.json
index 8db4367..db75c73 100644
--- a/neo4j/src/main/resources/interpreter-setting.json
+++ b/neo4j/src/main/resources/interpreter-setting.json
@@ -10,6 +10,18 @@
         "defaultValue": "bolt://localhost:7687",
         "description": "The Neo4j's BOLT url."
       },
+      "neo4j.database": {
+        "envName": null,
+        "propertyName": "neo4j.database",
+        "defaultValue": "",
+        "description": "The Neo4j target database, if empty use the dafault db."
+      },
+      "neo4j.multi.statement": {
+        "envName": null,
+        "propertyName": "neo4j.multi.statement",
+        "defaultValue": "true",
+        "description": "Enables the multi statement management, if true it computes multiple queries separated by semicolon."
+      },
       "neo4j.auth.type": {
         "envName": null,
         "propertyName": "neo4j.auth.type",
diff --git a/neo4j/src/test/java/org/apache/zeppelin/graph/neo4j/Neo4jCypherInterpreterTest.java b/neo4j/src/test/java/org/apache/zeppelin/graph/neo4j/Neo4jCypherInterpreterTest.java
index 7940d5f..4471993 100644
--- a/neo4j/src/test/java/org/apache/zeppelin/graph/neo4j/Neo4jCypherInterpreterTest.java
+++ b/neo4j/src/test/java/org/apache/zeppelin/graph/neo4j/Neo4jCypherInterpreterTest.java
@@ -24,18 +24,25 @@ import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.graph.GraphResult;
+import org.apache.zeppelin.tabledata.Node;
 import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.FixMethodOrder;
 import org.junit.Test;
 import org.junit.runners.MethodSorters;
-import org.neo4j.harness.ServerControls;
-import org.neo4j.harness.TestServerBuilders;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Session;
+import org.testcontainers.containers.Neo4jContainer;
+import org.testcontainers.shaded.com.fasterxml.jackson.core.JsonProcessingException;
+import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
@@ -47,7 +54,9 @@ public class Neo4jCypherInterpreterTest {
 
   private InterpreterContext context;
 
-  private static ServerControls server;
+  @ClassRule
+  public static Neo4jContainer neo4jContainer = new Neo4jContainer("neo4j:4.1.1")
+          .withoutAuthentication();
 
   private static final Gson gson = new Gson();
 
@@ -59,32 +68,28 @@ public class Neo4jCypherInterpreterTest {
                   "address: point({ longitude: 56.7, latitude: 12.78, height: 8 }), " +
                   "birth: date('1984-04-04')}))";
   private static final String CHPHER_UNWIND = "UNWIND range(1,100) as x "
-        + "MATCH (n), (m) WHERE id(n) = x AND id(m) = toInt(rand() * 100) "
+        + "MATCH (n), (m) WHERE id(n) = x AND id(m) = toInteger(rand() * 100) "
         + "CREATE (n)-[:%s]->(m)";
-  
+
   private static final String TABLE_RESULT_PREFIX = "%table ";
   private static final String NETWORK_RESULT_PREFIX = "%network ";
 
   @BeforeClass
-  public static void setUpNeo4jServer() throws Exception {
-    server = TestServerBuilders.newInProcessBuilder()
-                .withConfig("dbms.security.auth_enabled", "false")
-                .withFixture(String.format(CYPHER_FOREACH, LABEL_PERSON, "x % 10"))
-                .withFixture(String.format(CHPHER_UNWIND, REL_KNOWS))
-                .newServer();
-  }
-
-  @AfterClass
-  public static void tearDownNeo4jServer() throws Exception {
-    server.close();
+  public static void setUpNeo4jServer() {
+    try (Driver driver = GraphDatabase.driver(neo4jContainer.getBoltUrl());
+         Session session = driver.session()) {
+      session.run(String.format(CYPHER_FOREACH, LABEL_PERSON, "x % 10"));
+      session.run(String.format(CHPHER_UNWIND, REL_KNOWS));
+    }
   }
 
   @Before
   public void setUpZeppelin() {
     Properties p = new Properties();
-    p.setProperty(Neo4jConnectionManager.NEO4J_SERVER_URL, server.boltURI().toString());
+    p.setProperty(Neo4jConnectionManager.NEO4J_SERVER_URL, neo4jContainer.getBoltUrl());
     p.setProperty(Neo4jConnectionManager.NEO4J_AUTH_TYPE, Neo4jAuthType.NONE.toString());
     p.setProperty(Neo4jConnectionManager.NEO4J_MAX_CONCURRENCY, "50");
+    p.setProperty(Neo4jCypherInterpreter.NEO4J_MULTI_STATEMENT, "false");
     interpreter = new Neo4jCypherInterpreter(p);
     context = InterpreterContext.builder()
         .setInterpreterOut(new InterpreterOutput(null))
@@ -102,14 +107,14 @@ public class Neo4jCypherInterpreterTest {
     InterpreterResult result = interpreter.interpret(
             "return 'a' as colA, 'b' as colB, [1, 2, 3] as colC", context);
     assertEquals(Code.SUCCESS, result.code());
-    final String tableResult = "colA\tcolB\tcolC\n\"a\"\t\"b\"\t[1,2,3]\n";
+    final String tableResult = "colA\tcolB\tcolC\na\tb\t[1,2,3]\n";
     assertEquals(tableResult, result.toString().replace(TABLE_RESULT_PREFIX, StringUtils.EMPTY));
 
     result = interpreter.interpret(
             "return 'a' as colA, 'b' as colB, [{key: \"value\"}, {key: 1}] as colC", context);
     assertEquals(Code.SUCCESS, result.code());
     final String tableResultWithMap =
-            "colA\tcolB\tcolC\n\"a\"\t\"b\"\t[{\"key\":\"value\"},{\"key\":1}]\n";
+            "colA\tcolB\tcolC\na\tb\t[{\"key\":\"value\"},{\"key\":1}]\n";
     assertEquals(tableResultWithMap, result.toString().replace(TABLE_RESULT_PREFIX,
             StringUtils.EMPTY));
   }
@@ -131,9 +136,9 @@ public class Neo4jCypherInterpreterTest {
             + "n.address AS address, n.birth AS birth", context);
     assertEquals(Code.SUCCESS, result.code());
     final String tableResult = "name\tage\taddress\tbirth\n" +
-            "\"name1\"\t1\tPoint{srid=4979, x=56.7, y=12.78, z=8.0}\t1984-04-04\n" +
-            "\"name2\"\t2\tPoint{srid=4979, x=56.7, y=12.78, z=8.0}\t1984-04-04\n" +
-            "\"name3\"\t3\tPoint{srid=4979, x=56.7, y=12.78, z=8.0}\t1984-04-04\n";
+            "name1\t1\tPoint{srid=4979, x=56.7, y=12.78, z=8.0}\t1984-04-04\n" +
+            "name2\t2\tPoint{srid=4979, x=56.7, y=12.78, z=8.0}\t1984-04-04\n" +
+            "name3\t3\tPoint{srid=4979, x=56.7, y=12.78, z=8.0}\t1984-04-04\n";
     assertEquals(tableResult, result.toString().replace(TABLE_RESULT_PREFIX, StringUtils.EMPTY));
   }
 
@@ -319,4 +324,57 @@ public class Neo4jCypherInterpreterTest {
                     "12.0\t56.0\t1000.0\n",
             result.toString().replace(TABLE_RESULT_PREFIX, StringUtils.EMPTY));
   }
+
+  @Test
+  public void testMultiLineInterpreter() {
+    Properties p = new Properties();
+    p.setProperty(Neo4jConnectionManager.NEO4J_SERVER_URL, neo4jContainer.getBoltUrl());
+    p.setProperty(Neo4jConnectionManager.NEO4J_AUTH_TYPE, Neo4jAuthType.NONE.toString());
+    p.setProperty(Neo4jConnectionManager.NEO4J_MAX_CONCURRENCY, "50");
+    p.setProperty(Neo4jCypherInterpreter.NEO4J_MULTI_STATEMENT, "true");
+    Neo4jCypherInterpreter multiLineInterpreter = new Neo4jCypherInterpreter(p);
+    context = InterpreterContext.builder()
+            .setInterpreterOut(new InterpreterOutput(null))
+            .build();
+    InterpreterResult result = multiLineInterpreter.interpret("CREATE (n:Node{name: ';'});" +
+            "\nRETURN 1 AS val;", context);
+    assertEquals(Code.SUCCESS, result.code());
+    assertEquals("val\n1\n",
+            result.toString().replace(TABLE_RESULT_PREFIX, StringUtils.EMPTY));
+    result = multiLineInterpreter.interpret("CREATE (n:Node{name: \";\"}); " +
+            "RETURN 2 AS `other;Val`;", context);
+    assertEquals(Code.SUCCESS, result.code());
+    assertEquals("other;Val\n2\n",
+            result.toString().replace(TABLE_RESULT_PREFIX, StringUtils.EMPTY));
+    result = multiLineInterpreter.interpret("match (n:Node{name: ';'}) " +
+            "return count(n) AS count", context);
+    assertEquals("count\n2\n",
+            result.toString().replace(TABLE_RESULT_PREFIX, StringUtils.EMPTY));
+    result = multiLineInterpreter.interpret("match (n:Node) detach delete n; " +
+            "match (n:Node) return count(n) AS count", context);
+    assertEquals("count\n0\n",
+            result.toString().replace(TABLE_RESULT_PREFIX, StringUtils.EMPTY));
+  }
+
+  @Test
+  public void testNodeDataTypes() throws JsonProcessingException {
+    InterpreterResult result = interpreter.interpret(
+            "CREATE (n:NodeTypes{" +
+                    "dateTime: datetime('2015-06-24T12:50:35.556+0100')," +
+                    "point3d: point({ x:0, y:4, z:1 })})\n" +
+                    "RETURN n",
+            context);
+    assertEquals(Code.SUCCESS, result.code());
+    ObjectMapper jsonMapper = new ObjectMapper();
+
+    GraphResult.Graph graph = jsonMapper.readValue(result.toString()
+            .replace(NETWORK_RESULT_PREFIX, StringUtils.EMPTY), GraphResult.Graph.class);
+
+    final Node node = graph.getNodes().iterator().next();
+    Map<String, Object> expectedMap = new HashMap<>();
+    expectedMap.put("point3d", "Point{srid=9157, x=0.0, y=4.0, z=1.0}");
+    expectedMap.put("dateTime", "2015-06-24T12:50:35.556+01:00");
+    assertEquals(expectedMap, node.getData());
+    interpreter.interpret("MATCH (n:NodeTypes) DETACH DELETE n", context);
+  }
 }
diff --git a/pom.xml b/pom.xml
index 4e19a64..53ab50c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -198,6 +198,8 @@
     <plugin.surefire.version>2.17</plugin.surefire.version>
     <plugin.xml.version>1.0.2</plugin.xml.version>
 
+    <testcontainers.version>1.14.3</testcontainers.version>
+
     <plugin.gitcommitid.useNativeGit>false</plugin.gitcommitid.useNativeGit>
 
     <MaxMetaspace>512m</MaxMetaspace>
@@ -954,6 +956,20 @@
         <version>${powermock.version}</version>
         <scope>test</scope>
       </dependency>
+
+      <dependency>
+        <groupId>org.testcontainers</groupId>
+        <artifactId>testcontainers</artifactId>
+        <version>${testcontainers.version}</version>
+        <scope>test</scope>
+      </dependency>
+
+      <dependency>
+        <groupId>org.testcontainers</groupId>
+        <artifactId>neo4j</artifactId>
+        <version>${testcontainers.version}</version>
+        <scope>test</scope>
+      </dependency>
     </dependencies>
   </dependencyManagement>