You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2020/04/06 12:41:37 UTC
[tinkerpop] 06/27: TINKERPOP-2076 Bump to spark 3.0 with jdk11
support
This is an automated email from the ASF dual-hosted git repository.
spmallette pushed a commit to branch TINKERPOP-2076
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit f3b89c06ed97fab19f821555d380d562d56e4058
Author: stephen <sp...@gmail.com>
AuthorDate: Tue Nov 12 04:13:49 2019 -0500
TINKERPOP-2076 Bump to spark 3.0 with jdk11 support
---
CHANGELOG.asciidoc | 3 +-
gremlin-groovy/pom.xml | 2 +-
pom.xml | 2 +-
spark-gremlin/pom.xml | 254 ++++-----------------
.../spark/process/computer/MemoryAccumulator.java | 47 ++--
.../spark/process/computer/SparkMemory.java | 16 +-
6 files changed, 93 insertions(+), 231 deletions(-)
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 02e593a..c14e16c 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -42,7 +42,8 @@ This release also includes changes from <<release-3-4-3, 3.4.3>>.
* Refactored `Traversal` semantics to always expect `EmptyStep` as a parent if it is meant to be the root `Traversal`.
* Configured GraphBinary as the default binary serialization format for the Java Driver.
* Configured GraphSON 3.0 as the default text serialization format when no serializer can be determined.
-* Bump to Neo4j 3.4.11.
+* Bumped to Neo4j 3.4.11.
+* Bumped to Spark 3.0.0.
* Added a parameterized `TypeTranslator` for use with `GroovyTranslator` that should produce more cache hits.
* Added support for `TextP` in Neo4j using its string search functions.
* Changed `TraversalStrategy` application methodology to apply each strategy in turn to each level of the traversal hierarchy starting from root down to children.
diff --git a/gremlin-groovy/pom.xml b/gremlin-groovy/pom.xml
index c4373b4..b890a99 100644
--- a/gremlin-groovy/pom.xml
+++ b/gremlin-groovy/pom.xml
@@ -34,7 +34,7 @@ limitations under the License.
<dependency>
<groupId>org.apache.ivy</groupId>
<artifactId>ivy</artifactId>
- <version>2.3.0</version>
+ <version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
diff --git a/pom.xml b/pom.xml
index 423b76a..29c68dc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -159,7 +159,7 @@ limitations under the License.
<netty.version>4.1.42.Final</netty.version>
<slf4j.version>1.7.25</slf4j.version>
<snakeyaml.version>1.15</snakeyaml.version>
- <spark.version>2.4.0</spark.version>
+ <spark.version>3.0.0-preview</spark.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
diff --git a/spark-gremlin/pom.xml b/spark-gremlin/pom.xml
index 695d743..0d7a64d 100644
--- a/spark-gremlin/pom.xml
+++ b/spark-gremlin/pom.xml
@@ -32,7 +32,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>14.0.1</version>
+ <version>16.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.tinkerpop</groupId>
@@ -44,257 +44,105 @@
<artifactId>hadoop-gremlin</artifactId>
<version>${project.version}</version>
<exclusions>
- <exclusion>
- <groupId>javax.servlet</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.servlet</groupId>
- <artifactId>javax.servlet-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-server</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-net</groupId>
- <artifactId>commons-net</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </exclusion>
+ <!-- use our snappy as there is conflict within spark-->
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
+ <!-- use spark's avro -->
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
+ <!-- use spark's math -->
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</exclusion>
+ <!-- use spark's netty 4-->
<exclusion>
<groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
+ <artifactId>netty-all</artifactId>
</exclusion>
+ <!-- use spark's activation -->
+ <exclusion>
+ <groupId>javax.activation</groupId>
+ <artifactId>activation</artifactId>
+ </exclusion>
+ <!-- use zookeeper's netty 3 -->
<exclusion>
<groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
+ <artifactId>netty</artifactId>
</exclusion>
+ <!-- use sparks commons-compress -->
<exclusion>
- <groupId>com.thoughtworks.paranamer</groupId>
- <artifactId>paranamer</artifactId>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- SPARK -->
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
+ <artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
<exclusions>
- <!-- self conflicts -->
- <exclusion>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-compiler</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.scala-lang.modules</groupId>
- <artifactId>scala-xml_2.11</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.activation</groupId>
- <artifactId>activation</artifactId>
- </exclusion>
<exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-core-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-reflect</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpcore</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.servlet</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy</artifactId>
</exclusion>
<exclusion>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
</exclusion>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
- <exclusion>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.thoughtworks.paranamer</groupId>
- <artifactId>paranamer</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.servlet</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- <!-- gremlin-core conflicts -->
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>jcl-over-slf4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.ivy</groupId>
- <artifactId>ivy</artifactId>
- </exclusion>
- <!-- gremlin-groovy conflicts -->
- <exclusion>
- <groupId>jline</groupId>
- <artifactId>jline</artifactId>
- </exclusion>
- <!-- hadoop conflicts -->
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-client</artifactId>
- </exclusion>
- <!-- lgpl conflicts -->
- <exclusion>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>jsr305</artifactId>
- </exclusion>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- </exclusion>
- <!-- avro conflicts -->
- <exclusion>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-compress</artifactId>
- </exclusion>
</exclusions>
</dependency>
- <!-- consistent dependencies -->
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>2.11.8</version>
- </dependency>
- <dependency>
- <groupId>org.scala-lang.modules</groupId>
- <artifactId>scala-xml_2.11</artifactId>
- <version>1.0.5</version>
- <exclusions>
- <exclusion>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>2.6.7</version>
- </dependency>
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- <version>${commons.lang.version}</version>
- </dependency>
- <dependency>
- <groupId>com.thoughtworks.paranamer</groupId>
- <artifactId>paranamer</artifactId>
- <version>2.6</version>
- </dependency>
+ <!-- spark self-conflict and hadoop conflict -->
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
- <version>1.1.1.7</version>
+ <version>1.1.7.3</version>
</dependency>
+ <!-- spark self-conflict -->
<dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>4.1.32.Final</version>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>2.12.10</version>
</dependency>
+ <!-- spark self-confict -->
<dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- <version>3.9.9.Final</version>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.10.0</version>
</dependency>
+ <!-- spark self-confict -->
<dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-compress</artifactId>
- <version>1.19</version>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.6</version>
+ <exclusions>
+ <!-- use gremlin-groovy's jline -->
+ <exclusion>
+ <groupId>jline</groupId>
+ <artifactId>jline</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- TEST -->
<dependency>
@@ -302,16 +150,6 @@
<artifactId>gremlin-test</artifactId>
<version>${project.version}</version>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.objenesis</groupId>
- <artifactId>objenesis</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.tinkerpop</groupId>
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java
index cf8cb25..cc7b8de 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java
@@ -19,37 +19,58 @@
package org.apache.tinkerpop.gremlin.spark.process.computer;
-import org.apache.spark.AccumulatorParam;
+import org.apache.spark.util.AccumulatorV2;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
+ * @author Stephen Mallette (http://stephen.genoprime.com)
*/
-public final class MemoryAccumulator<A> implements AccumulatorParam<ObjectWritable<A>> {
+public final class MemoryAccumulator<A> extends AccumulatorV2<ObjectWritable<A>, ObjectWritable<A>> {
private final MemoryComputeKey<A> memoryComputeKey;
+ private ObjectWritable<A> value;
- public MemoryAccumulator(final MemoryComputeKey<A> memoryComputeKey) {
+ MemoryAccumulator(final MemoryComputeKey<A> memoryComputeKey) {
+ this(memoryComputeKey, ObjectWritable.empty());
+ }
+
+ private MemoryAccumulator(final MemoryComputeKey<A> memoryComputeKey, final ObjectWritable<A> initial) {
this.memoryComputeKey = memoryComputeKey;
+ this.value = initial;
+ }
+
+ @Override
+ public boolean isZero() {
+ return ObjectWritable.empty().equals(value);
+ }
+
+ @Override
+ public AccumulatorV2<ObjectWritable<A>, ObjectWritable<A>> copy() {
+ return new MemoryAccumulator<>(this.memoryComputeKey, this.value);
+ }
+
+ @Override
+ public void reset() {
+ this.value = ObjectWritable.empty();
}
@Override
- public ObjectWritable<A> addAccumulator(final ObjectWritable<A> a, final ObjectWritable<A> b) {
- if (a.isEmpty())
- return b;
- if (b.isEmpty())
- return a;
- return new ObjectWritable<>(this.memoryComputeKey.getReducer().apply(a.get(), b.get()));
+ public void add(final ObjectWritable<A> v) {
+ if (this.value.isEmpty())
+ this.value = v;
+ if (!v.isEmpty())
+ this.value = new ObjectWritable<>(this.memoryComputeKey.getReducer().apply(value.get(), v.get()));
}
@Override
- public ObjectWritable<A> addInPlace(final ObjectWritable<A> a, final ObjectWritable<A> b) {
- return this.addAccumulator(a, b);
+ public void merge(final AccumulatorV2<ObjectWritable<A>, ObjectWritable<A>> other) {
+ this.add(other.value());
}
@Override
- public ObjectWritable<A> zero(final ObjectWritable<A> a) {
- return ObjectWritable.empty();
+ public ObjectWritable<A> value() {
+ return this.value;
}
}
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
index bf8590e..5a04162 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
@@ -18,7 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.spark.process.computer;
-import org.apache.spark.Accumulator;
+import org.apache.spark.util.AccumulatorV2;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
@@ -46,7 +46,7 @@ import java.util.concurrent.atomic.AtomicLong;
public final class SparkMemory implements Memory.Admin, Serializable {
public final Map<String, MemoryComputeKey> memoryComputeKeys = new HashMap<>();
- private final Map<String, Accumulator<ObjectWritable>> sparkMemory = new HashMap<>();
+ private final Map<String, AccumulatorV2<ObjectWritable,ObjectWritable>> sparkMemory = new HashMap<>();
private final AtomicInteger iteration = new AtomicInteger(0);
private final AtomicLong runtime = new AtomicLong(0l);
private Broadcast<Map<String, Object>> broadcast;
@@ -62,9 +62,9 @@ public final class SparkMemory implements Memory.Admin, Serializable {
this.memoryComputeKeys.put(mapReduce.getMemoryKey(), MemoryComputeKey.of(mapReduce.getMemoryKey(), Operator.assign, false, false));
}
for (final MemoryComputeKey memoryComputeKey : this.memoryComputeKeys.values()) {
- this.sparkMemory.put(
- memoryComputeKey.getKey(),
- sparkContext.accumulator(ObjectWritable.empty(), memoryComputeKey.getKey(), new MemoryAccumulator<>(memoryComputeKey)));
+ final AccumulatorV2<ObjectWritable, ObjectWritable> accumulator = new MemoryAccumulator<>(memoryComputeKey);
+ JavaSparkContext.toSparkContext(sparkContext).register(accumulator, memoryComputeKey.getKey());
+ this.sparkMemory.put(memoryComputeKey.getKey(), accumulator);
}
this.broadcast = sparkContext.broadcast(Collections.emptyMap());
}
@@ -135,8 +135,10 @@ public final class SparkMemory implements Memory.Admin, Serializable {
checkKeyValue(key, value);
if (this.inExecute)
throw Memory.Exceptions.memorySetOnlyDuringVertexProgramSetUpAndTerminate(key);
- else
- this.sparkMemory.get(key).setValue(new ObjectWritable<>(value));
+ else {
+ this.sparkMemory.get(key).reset();
+ this.sparkMemory.get(key).add(new ObjectWritable<>(value));
+ }
}
@Override