You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2020/05/21 18:16:13 UTC

[tinkerpop] 06/27: TINKERPOP-2076 Bump to spark 3.0 with jdk11 support

This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch TINKERPOP-2076
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit ec660c784340e2fd01c0fc350aed502d4927e52d
Author: stephen <sp...@gmail.com>
AuthorDate: Tue Nov 12 04:13:49 2019 -0500

    TINKERPOP-2076 Bump to spark 3.0 with jdk11 support
---
 CHANGELOG.asciidoc                                 |   3 +-
 gremlin-groovy/pom.xml                             |   2 +-
 pom.xml                                            |   2 +-
 spark-gremlin/pom.xml                              | 254 ++++-----------------
 .../spark/process/computer/MemoryAccumulator.java  |  47 ++--
 .../spark/process/computer/SparkMemory.java        |  16 +-
 6 files changed, 93 insertions(+), 231 deletions(-)

diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 9f3b9bc..9abe36f 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -47,7 +47,8 @@ This release also includes changes from <<release-3-4-3, 3.4.3>>.
 * Configured GraphBinary as the default binary serialization format for the Java Driver.
 * Configured GraphSON 3.0 as the default text serialization format when no serializer can be determined.
 * Configured GraphSON 3.0 as the default setting for the `GraphSONMapper`.
-* Bump to Neo4j 3.4.11.
+* Bumped to Neo4j 3.4.11.
+* Bumped to Spark 3.0.0.
 * Added a parameterized `TypeTranslator` for use with `GroovyTranslator` that should produce more cache hits.
 * Added support for `TextP` in Neo4j using its string search functions.
 * Changed `TraversalStrategy` application methodology to apply each strategy in turn to each level of the traversal hierarchy starting from root down to children.
diff --git a/gremlin-groovy/pom.xml b/gremlin-groovy/pom.xml
index c4373b4..b890a99 100644
--- a/gremlin-groovy/pom.xml
+++ b/gremlin-groovy/pom.xml
@@ -34,7 +34,7 @@ limitations under the License.
         <dependency>
             <groupId>org.apache.ivy</groupId>
             <artifactId>ivy</artifactId>
-            <version>2.3.0</version>
+            <version>2.4.0</version>
         </dependency>
         <dependency>
             <groupId>org.codehaus.groovy</groupId>
diff --git a/pom.xml b/pom.xml
index 61aa457..bf22855 100644
--- a/pom.xml
+++ b/pom.xml
@@ -159,7 +159,7 @@ limitations under the License.
         <netty.version>4.1.49.Final</netty.version>
         <slf4j.version>1.7.25</slf4j.version>
         <snakeyaml.version>1.15</snakeyaml.version>
-        <spark.version>2.4.0</spark.version>
+        <spark.version>3.0.0-preview</spark.version>
 
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
diff --git a/spark-gremlin/pom.xml b/spark-gremlin/pom.xml
index 695d743..0d7a64d 100644
--- a/spark-gremlin/pom.xml
+++ b/spark-gremlin/pom.xml
@@ -32,7 +32,7 @@
         <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
-            <version>14.0.1</version>
+            <version>16.0.1</version>
         </dependency>
         <dependency>
             <groupId>org.apache.tinkerpop</groupId>
@@ -44,257 +44,105 @@
             <artifactId>hadoop-gremlin</artifactId>
             <version>${project.version}</version>
             <exclusions>
-                <exclusion>
-                    <groupId>javax.servlet</groupId>
-                    <artifactId>servlet-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>javax.servlet</groupId>
-                    <artifactId>javax.servlet-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.sun.jersey</groupId>
-                    <artifactId>jersey-core</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.sun.jersey</groupId>
-                    <artifactId>jersey-server</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>commons-net</groupId>
-                    <artifactId>commons-net</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>commons-io</groupId>
-                    <artifactId>commons-io</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.guava</groupId>
-                    <artifactId>guava</artifactId>
-                </exclusion>
+                <!-- use our snappy as there is conflict within spark-->
                 <exclusion>
                     <groupId>org.xerial.snappy</groupId>
                     <artifactId>snappy-java</artifactId>
                 </exclusion>
+                <!-- use spark's avro -->
                 <exclusion>
                     <groupId>org.apache.avro</groupId>
                     <artifactId>avro</artifactId>
                 </exclusion>
+                <!-- use spark's math -->
                 <exclusion>
                     <groupId>org.apache.commons</groupId>
                     <artifactId>commons-math3</artifactId>
                 </exclusion>
+                <!-- use spark's netty 4-->
                 <exclusion>
                     <groupId>io.netty</groupId>
-                    <artifactId>netty</artifactId>
+                    <artifactId>netty-all</artifactId>
                 </exclusion>
+                <!-- use spark's activation -->
+                <exclusion>
+                    <groupId>javax.activation</groupId>
+                    <artifactId>activation</artifactId>
+                </exclusion>
+                <!-- use zookeeper's netty 3 -->
                 <exclusion>
                     <groupId>io.netty</groupId>
-                    <artifactId>netty-all</artifactId>
+                    <artifactId>netty</artifactId>
                 </exclusion>
+                <!-- use sparks commons-compress -->
                 <exclusion>
-                    <groupId>com.thoughtworks.paranamer</groupId>
-                    <artifactId>paranamer</artifactId>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-compress</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
         <!-- SPARK -->
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-core_2.11</artifactId>
+            <artifactId>spark-core_2.12</artifactId>
             <version>${spark.version}</version>
             <exclusions>
-                <!-- self conflicts -->
-                <exclusion>
-                    <groupId>org.scala-lang</groupId>
-                    <artifactId>scala-compiler</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.httpcomponents</groupId>
-                    <artifactId>httpclient</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.scala-lang.modules</groupId>
-                    <artifactId>scala-xml_2.11</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>javax.activation</groupId>
-                    <artifactId>activation</artifactId>
-                </exclusion>
                 <exclusion>
-                    <groupId>org.codehaus.jackson</groupId>
-                    <artifactId>jackson-mapper-asl</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.codehaus.jackson</groupId>
-                    <artifactId>jackson-core-asl</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.scala-lang</groupId>
-                    <artifactId>scala-library</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>jackson-core</artifactId>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
                 </exclusion>
                 <exclusion>
                     <groupId>com.fasterxml.jackson.core</groupId>
                     <artifactId>jackson-databind</artifactId>
                 </exclusion>
                 <exclusion>
-                    <groupId>org.scala-lang</groupId>
-                    <artifactId>scala-reflect</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.httpcomponents</groupId>
-                    <artifactId>httpcore</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>javax.servlet</groupId>
-                    <artifactId>servlet-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>jackson-annotations</artifactId>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
                 </exclusion>
                 <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.commons</groupId>
-                    <artifactId>commons-lang3</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>commons-lang</groupId>
-                    <artifactId>commons-lang</artifactId>
+                    <groupId>org.xerial.snappy</groupId>
+                    <artifactId>snappy</artifactId>
                 </exclusion>
                 <exclusion>
-                    <groupId>commons-codec</groupId>
-                    <artifactId>commons-codec</artifactId>
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>scala-library</artifactId>
                 </exclusion>
                 <exclusion>
                     <groupId>org.xerial.snappy</groupId>
                     <artifactId>snappy-java</artifactId>
                 </exclusion>
-                <exclusion>
-                    <groupId>org.apache.curator</groupId>
-                    <artifactId>curator-framework</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.curator</groupId>
-                    <artifactId>curator-recipes</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.thoughtworks.paranamer</groupId>
-                    <artifactId>paranamer</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>javax.servlet</groupId>
-                    <artifactId>servlet-api</artifactId>
-                </exclusion>
-                <!-- gremlin-core conflicts -->
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>jcl-over-slf4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.ivy</groupId>
-                    <artifactId>ivy</artifactId>
-                </exclusion>
-                <!-- gremlin-groovy conflicts -->
-                <exclusion>
-                    <groupId>jline</groupId>
-                    <artifactId>jline</artifactId>
-                </exclusion>
-                <!-- hadoop conflicts -->
-                <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-client</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.curator</groupId>
-                    <artifactId>curator-client</artifactId>
-                </exclusion>
-                <!-- lgpl conflicts -->
-                <exclusion>
-                    <groupId>com.google.code.findbugs</groupId>
-                    <artifactId>jsr305</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>io.netty</groupId>
-                    <artifactId>netty</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>io.netty</groupId>
-                    <artifactId>netty-all</artifactId>
-                </exclusion>
-                <!-- avro conflicts -->
-                <exclusion>
-                    <groupId>org.apache.commons</groupId>
-                    <artifactId>commons-compress</artifactId>
-                </exclusion>
             </exclusions>
         </dependency>
-        <!-- consistent dependencies -->
-        <dependency>
-            <groupId>org.scala-lang</groupId>
-            <artifactId>scala-library</artifactId>
-            <version>2.11.8</version>
-        </dependency>
-        <dependency>
-            <groupId>org.scala-lang.modules</groupId>
-            <artifactId>scala-xml_2.11</artifactId>
-            <version>1.0.5</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.scala-lang</groupId>
-                    <artifactId>scala-library</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-databind</artifactId>
-            <version>2.6.7</version>
-        </dependency>
-        <dependency>
-            <groupId>commons-lang</groupId>
-            <artifactId>commons-lang</artifactId>
-            <version>${commons.lang.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.thoughtworks.paranamer</groupId>
-            <artifactId>paranamer</artifactId>
-            <version>2.6</version>
-        </dependency>
+        <!-- spark self-conflict and hadoop conflict -->
         <dependency>
             <groupId>org.xerial.snappy</groupId>
             <artifactId>snappy-java</artifactId>
-            <version>1.1.1.7</version>
+            <version>1.1.7.3</version>
         </dependency>
+        <!-- spark self-conflict -->
         <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-all</artifactId>
-            <version>4.1.32.Final</version>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>2.12.10</version>
         </dependency>
+        <!-- spark self-confict -->
         <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty</artifactId>
-            <version>3.9.9.Final</version>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>2.10.0</version>
         </dependency>
+        <!-- spark self-confict -->
         <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-compress</artifactId>
-            <version>1.19</version>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+            <version>3.4.6</version>
+            <exclusions>
+                <!-- use gremlin-groovy's jline -->
+                <exclusion>
+                    <groupId>jline</groupId>
+                    <artifactId>jline</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <!-- TEST -->
         <dependency>
@@ -302,16 +150,6 @@
             <artifactId>gremlin-test</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.google.guava</groupId>
-                    <artifactId>guava</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.objenesis</groupId>
-                    <artifactId>objenesis</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.tinkerpop</groupId>
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java
index cf8cb25..cc7b8de 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java
@@ -19,37 +19,58 @@
 
 package org.apache.tinkerpop.gremlin.spark.process.computer;
 
-import org.apache.spark.AccumulatorParam;
+import org.apache.spark.util.AccumulatorV2;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @author Stephen Mallette (http://stephen.genoprime.com)
  */
-public final class MemoryAccumulator<A> implements AccumulatorParam<ObjectWritable<A>> {
+public final class MemoryAccumulator<A> extends AccumulatorV2<ObjectWritable<A>, ObjectWritable<A>> {
 
     private final MemoryComputeKey<A> memoryComputeKey;
+    private ObjectWritable<A> value;
 
-    public MemoryAccumulator(final MemoryComputeKey<A> memoryComputeKey) {
+    MemoryAccumulator(final MemoryComputeKey<A> memoryComputeKey) {
+        this(memoryComputeKey, ObjectWritable.empty());
+    }
+
+    private MemoryAccumulator(final MemoryComputeKey<A> memoryComputeKey, final ObjectWritable<A> initial) {
         this.memoryComputeKey = memoryComputeKey;
+        this.value = initial;
+    }
+
+    @Override
+    public boolean isZero() {
+        return ObjectWritable.empty().equals(value);
+    }
+
+    @Override
+    public AccumulatorV2<ObjectWritable<A>, ObjectWritable<A>> copy() {
+        return new MemoryAccumulator<>(this.memoryComputeKey, this.value);
+    }
+
+    @Override
+    public void reset() {
+        this.value = ObjectWritable.empty();
     }
 
     @Override
-    public ObjectWritable<A> addAccumulator(final ObjectWritable<A> a, final ObjectWritable<A> b) {
-        if (a.isEmpty())
-            return b;
-        if (b.isEmpty())
-            return a;
-        return new ObjectWritable<>(this.memoryComputeKey.getReducer().apply(a.get(), b.get()));
+    public void add(final ObjectWritable<A> v) {
+        if (this.value.isEmpty())
+            this.value = v;
+        if (!v.isEmpty())
+            this.value = new ObjectWritable<>(this.memoryComputeKey.getReducer().apply(value.get(), v.get()));
     }
 
     @Override
-    public ObjectWritable<A> addInPlace(final ObjectWritable<A> a, final ObjectWritable<A> b) {
-        return this.addAccumulator(a, b);
+    public void merge(final AccumulatorV2<ObjectWritable<A>, ObjectWritable<A>> other) {
+        this.add(other.value());
     }
 
     @Override
-    public ObjectWritable<A> zero(final ObjectWritable<A> a) {
-        return ObjectWritable.empty();
+    public ObjectWritable<A> value() {
+        return this.value;
     }
 }
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
index bf8590e..5a04162 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
@@ -18,7 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.spark.process.computer;
 
-import org.apache.spark.Accumulator;
+import org.apache.spark.util.AccumulatorV2;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
@@ -46,7 +46,7 @@ import java.util.concurrent.atomic.AtomicLong;
 public final class SparkMemory implements Memory.Admin, Serializable {
 
     public final Map<String, MemoryComputeKey> memoryComputeKeys = new HashMap<>();
-    private final Map<String, Accumulator<ObjectWritable>> sparkMemory = new HashMap<>();
+    private final Map<String, AccumulatorV2<ObjectWritable,ObjectWritable>> sparkMemory = new HashMap<>();
     private final AtomicInteger iteration = new AtomicInteger(0);
     private final AtomicLong runtime = new AtomicLong(0l);
     private Broadcast<Map<String, Object>> broadcast;
@@ -62,9 +62,9 @@ public final class SparkMemory implements Memory.Admin, Serializable {
             this.memoryComputeKeys.put(mapReduce.getMemoryKey(), MemoryComputeKey.of(mapReduce.getMemoryKey(), Operator.assign, false, false));
         }
         for (final MemoryComputeKey memoryComputeKey : this.memoryComputeKeys.values()) {
-            this.sparkMemory.put(
-                    memoryComputeKey.getKey(),
-                    sparkContext.accumulator(ObjectWritable.empty(), memoryComputeKey.getKey(), new MemoryAccumulator<>(memoryComputeKey)));
+            final AccumulatorV2<ObjectWritable, ObjectWritable> accumulator = new MemoryAccumulator<>(memoryComputeKey);
+            JavaSparkContext.toSparkContext(sparkContext).register(accumulator, memoryComputeKey.getKey());
+            this.sparkMemory.put(memoryComputeKey.getKey(), accumulator);
         }
         this.broadcast = sparkContext.broadcast(Collections.emptyMap());
     }
@@ -135,8 +135,10 @@ public final class SparkMemory implements Memory.Admin, Serializable {
         checkKeyValue(key, value);
         if (this.inExecute)
             throw Memory.Exceptions.memorySetOnlyDuringVertexProgramSetUpAndTerminate(key);
-        else
-            this.sparkMemory.get(key).setValue(new ObjectWritable<>(value));
+        else {
+            this.sparkMemory.get(key).reset();
+            this.sparkMemory.get(key).add(new ObjectWritable<>(value));
+        }
     }
 
     @Override