You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/21 20:33:44 UTC

incubator-tinkerpop git commit: added GryoSerailizerIntegrateTest that verifies that when Spark is under stress and has to spill to disk, that the appropriate classes are registered with GryoSerializer. This test help to expose the CompactBuffer registra

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 889e38e5a -> 484746b96


added GryoSerailizerIntegrateTest that verifies that when Spark is under stress and has to spill to disk, that the appropriate classes are registered with GryoSerializer. This test help to expose the CompactBuffer registration issue. Note that this test takes about 10 minutes to run, but it is an integrate test so.... Also, found a bug where if the RDD is read from the context, its persitence level can not be changed. Thus, checked for that situation. CTR.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/484746b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/484746b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/484746b9

Branch: refs/heads/master
Commit: 484746b96b4505b4c87eb7b8739597c6acda71ea
Parents: 889e38e
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Jan 21 12:33:48 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 21 12:33:48 2016 -0700

----------------------------------------------------------------------
 .../apache/tinkerpop/gremlin/TestHelper.java    | 19 +++-
 .../process/computer/SparkGraphComputer.java    |  8 +-
 .../spark/structure/io/SparkContextStorage.java |  1 +
 .../gremlin/spark/AbstractSparkTest.java        | 17 +---
 .../io/gryo/GryoSerializerIntegrateTest.java    | 99 ++++++++++++++++++++
 5 files changed, 126 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/484746b9/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/TestHelper.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/TestHelper.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/TestHelper.java
index 0633f46..303b1e3 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/TestHelper.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/TestHelper.java
@@ -20,7 +20,9 @@ package org.apache.tinkerpop.gremlin;
 
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Property;
+import org.apache.tinkerpop.gremlin.structure.T;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
 import org.apache.tinkerpop.gremlin.structure.util.Comparators;
@@ -33,6 +35,7 @@ import java.io.InputStream;
 import java.net.URL;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Random;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -49,7 +52,8 @@ public final class TestHelper {
     private static final String SEP = File.separator;
     public static final String TEST_DATA_RELATIVE_DIR = "test-case-data";
 
-    private TestHelper() {}
+    private TestHelper() {
+    }
 
     /**
      * Creates a {@link File} reference that points to a directory relative to the supplied class in the
@@ -251,4 +255,17 @@ public final class TestHelper {
         else
             throw new IllegalArgumentException("The provided object must be a graph object: " + original.getClass().getCanonicalName());
     }
+
+    public static void createRandomGraph(final Graph graph, final int numberOfVertices, final int maxNumberOfEdgesPerVertex) {
+        final Random random = new Random();
+        for (int i = 0; i < numberOfVertices; i++) {
+            graph.addVertex(T.id, i);
+        }
+        graph.vertices().forEachRemaining(vertex -> {
+            for (int i = 0; i < random.nextInt(maxNumberOfEdgesPerVertex); i++) {
+                final Vertex other = graph.vertices(random.nextInt(numberOfVertices)).next();
+                vertex.addEdge("link", other);
+            }
+        });
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/484746b9/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index bb7e8bb..4ec3333 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -55,6 +55,7 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.InputOutputHelper;
 import org.apache.tinkerpop.gremlin.spark.structure.io.InputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.OutputFormatRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage;
 import org.apache.tinkerpop.gremlin.structure.io.Storage;
@@ -134,6 +135,8 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
             final long startTime = System.currentTimeMillis();
             final Storage fileSystemStorage = FileSystemStorage.open(hadoopConfiguration);
             final Storage sparkContextStorage = SparkContextStorage.open(apacheConfiguration);
+            // final boolean inputFromHDFS = FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, Object.class));
+            final boolean inputFromSpark = PersistedInputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, Object.class));
             final boolean outputToHDFS = FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, Object.class));
             final boolean outputToSpark = PersistedOutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, Object.class));
             SparkMemory memory = null;
@@ -166,7 +169,8 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                     if (this.workersSet && graphRDD.partitions().size() > this.workers) // ensures that the graphRDD does not have more partitions than workers
                         graphRDD = graphRDD.coalesce(this.workers);
                     // persist the vertex program loaded graph as specified by configuration or else use default cache() which is MEMORY_ONLY
-                    graphRDD = graphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
+                    if (!inputFromSpark)
+                        graphRDD = graphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
                 } catch (final InstantiationException | IllegalAccessException e) {
                     throw new IllegalStateException(e.getMessage(), e);
                 }
@@ -277,7 +281,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 for (final String path : paths) {
                     final File file = new File(path);
                     if (file.exists())
-                        Stream.of(file.listFiles()).filter(f -> f.getName().contains(Constants.DOT_JAR)).forEach(f -> sparkContext.addJar(f.getAbsolutePath()));
+                        Stream.of(file.listFiles()).filter(f -> f.getName().endsWith(Constants.DOT_JAR)).forEach(f -> sparkContext.addJar(f.getAbsolutePath()));
                     else
                         this.logger.warn(path + " does not reference a valid directory -- proceeding regardless");
                 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/484746b9/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
index 6801304..299d973 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
@@ -90,6 +90,7 @@ public final class SparkContextStorage implements Storage {
             return false;
         for (final String rdd : rdds) {
             Spark.getRDD(rdd).toJavaRDD().filter(a -> true).setName(rdd.equals(sourceLocation) ? targetLocation : rdd.replace(sourceLocation, targetLocation)).cache().count();
+            // TODO: this should use the original storage level
         }
         return true;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/484746b9/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java
index 2ad0c14..985b832 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java
@@ -26,12 +26,7 @@ import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
 import org.apache.tinkerpop.gremlin.spark.structure.Spark;
-import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
-import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.junit.After;
@@ -58,22 +53,14 @@ public abstract class AbstractSparkTest {
         logger.info("SparkContext has been closed for " + this.getClass().getCanonicalName() + "-setupTest");
     }
 
-    protected Configuration getBaseConfiguration(final String inputLocation) {
+    protected Configuration getBaseConfiguration() {
         final BaseConfiguration configuration = new BaseConfiguration();
         configuration.setDelimiterParsingDisabled(true);
         configuration.setProperty("spark.master", "local[4]");
         configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+        configuration.setProperty("spark.kryo.registrationRequired", true);
         configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
         configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
-        if (inputLocation.contains(".kryo"))
-            configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
-        else if (inputLocation.contains(".json"))
-            configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GraphSONInputFormat.class.getCanonicalName());
-        else
-            configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName());
-
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
         return configuration;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/484746b9/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerIntegrateTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerIntegrateTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerIntegrateTest.java
new file mode 100644
index 0000000..c493be4
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializerIntegrateTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io.gryo;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.TestHelper;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
+import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.IoCore;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class GryoSerializerIntegrateTest extends AbstractSparkTest {
+
+    @Test
+    public void shouldHaveAllRegisteredGryoSerializerClasses() throws Exception {
+        // this is a stress test that ensures that when data is spilling to disk, persisted to an RDD, etc. the correct classes are registered with GryoSerializer.
+        final TinkerGraph randomGraph = TinkerGraph.open();
+        int totalVertices = 200000;
+        TestHelper.createRandomGraph(randomGraph, totalVertices, 100);
+        final String inputLocation = TestHelper.makeTestDataDirectory(GryoSerializerIntegrateTest.class, UUID.randomUUID().toString()) + "/random-graph.kryo";
+        randomGraph.io(IoCore.gryo()).writeGraph(inputLocation);
+        randomGraph.clear();
+        randomGraph.close();
+
+        final String outputLocation = TestHelper.makeTestDataDirectory(GryoSerializerIntegrateTest.class, UUID.randomUUID().toString());
+        Configuration configuration = getBaseConfiguration();
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, inputLocation);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
+        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false);
+        Graph graph = GraphFactory.open(configuration);
+        assertEquals(totalVertices, graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().count().next().longValue());
+
+        configuration = getBaseConfiguration();
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, inputLocation);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, "DISK_ONLY");
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "persisted-rdd");
+        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+        graph = GraphFactory.open(configuration);
+        assertEquals(totalVertices, graph.compute(SparkGraphComputer.class).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph().traversal().V().count().next().longValue());
+
+        configuration = getBaseConfiguration();
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, "persisted-rdd");
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
+        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+        graph = GraphFactory.open(configuration);
+        assertEquals(totalVertices, graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().count().next().longValue());
+
+        configuration = getBaseConfiguration();
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, "persisted-rdd");
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY"); // this should be ignored as you can't change the persistence level once created
+        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, "MEMORY_AND_DISK");
+        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+        graph = GraphFactory.open(configuration);
+        assertEquals(totalVertices, graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().count().next().longValue());
+    }
+}