You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/08 15:20:29 UTC

[04/17] incubator-tinkerpop git commit: Greatly greatly simplified Hadoop OLTP and interactions with HDFS and SparkContext. The trend -- dir/~g for graphs and dir/x for memory. A consistent persistence schema makes everything so much simpler. I always as

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
index 2db267f..887e2f9 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
@@ -19,11 +19,18 @@
 
 package org.apache.tinkerpop.gremlin.spark.structure.io;
 
+import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.rdd.RDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
 import org.apache.tinkerpop.gremlin.spark.structure.Spark;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.io.Storage;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import scala.collection.JavaConversions;
 
@@ -114,11 +121,53 @@ public final class SparkContextStorage implements Storage {
     }
 
     @Override
-    public <V> Iterator<V> head(final String location, final int totalLines, final Class<V> objectClass) {
-        return IteratorUtils.limit((Iterator) JavaConversions.asJavaIterator(Spark.getRDD(location).toLocalIterator()), totalLines);
+    public Iterator<Vertex> headGraph(final String location, int totalLines, final Class parserClass) {
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, Constants.getSearchGraphLocation(location, this).get());
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, parserClass.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, parserClass.getCanonicalName());
+        try {
+            if (InputRDD.class.isAssignableFrom(parserClass)) {
+                return IteratorUtils.limit(IteratorUtils.map(((InputRDD) parserClass.getConstructor().newInstance()).readGraphRDD(configuration, new JavaSparkContext(Spark.getContext())).toLocalIterator(), tuple -> tuple._2().get()), totalLines);
+            } else if (InputFormat.class.isAssignableFrom(parserClass)) {
+                return IteratorUtils.limit(IteratorUtils.map(new InputFormatRDD().readGraphRDD(configuration, new JavaSparkContext(Spark.getContext())).toLocalIterator(), tuple -> tuple._2().get()), totalLines);
+            }
+        } catch (final Exception e) {
+            throw new IllegalArgumentException(e.getMessage(), e);
+        }
+        throw new IllegalArgumentException("The provided parserClass must be an " + InputFormat.class.getCanonicalName() + " or a " + InputRDD.class.getCanonicalName() + ": " + parserClass.getCanonicalName());
+    }
+
+    @Override
+    public <K, V> Iterator<KeyValue<K, V>> headMemory(final String location, final String memoryKey, int totalLines, Class parserClass) {
+        final Configuration configuration = new BaseConfiguration();
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, Constants.getMemoryLocation(location, memoryKey));
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, parserClass.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, parserClass.getCanonicalName());
+        try {
+            if (InputRDD.class.isAssignableFrom(parserClass)) {
+                return IteratorUtils.limit(IteratorUtils.map(((InputRDD) parserClass.getConstructor().newInstance()).readMemoryRDD(configuration, memoryKey, new JavaSparkContext(Spark.getContext())).toLocalIterator(), tuple -> new KeyValue(tuple._1(), tuple._2())), totalLines);
+            } else if (InputFormat.class.isAssignableFrom(parserClass)) {
+                return IteratorUtils.limit(IteratorUtils.map(new InputFormatRDD().readMemoryRDD(configuration, memoryKey, new JavaSparkContext(Spark.getContext())).toLocalIterator(), tuple -> new KeyValue(tuple._1(), tuple._2())), totalLines);
+            }
+        } catch (final Exception e) {
+            throw new IllegalArgumentException(e.getMessage(), e);
+        }
+        throw new IllegalArgumentException("The provided parserClass must be an " + InputFormat.class.getCanonicalName() + " or a " + InputRDD.class.getCanonicalName() + ": " + parserClass.getCanonicalName());
     }
 
+    @Override
+    public Iterator<String> head(final String location, final int totalLines) {
+        return IteratorUtils.limit(IteratorUtils.map(JavaConversions.asJavaIterator(Spark.getRDD(location).toLocalIterator()), Object::toString), totalLines);
+    }
+
+    // TODO: @Override
     public String describe(final String location) {
         return Spark.getRDD(location).toDebugString();
     }
+
+    @Override
+    public String toString() {
+        return StringFactory.storageString(Spark.getContext().master());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GraphMemorySparkTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GraphMemorySparkTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GraphMemorySparkTest.java
deleted file mode 100644
index 10153b0..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GraphMemorySparkTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.structure.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
-import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.ClusterCountMapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgram;
-import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.io.Storage;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import org.junit.Test;
-import scala.Tuple2;
-
-import java.util.UUID;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class GraphMemorySparkTest extends AbstractSparkTest {
-
-    @Test
-    public void shouldPersistGraphAndMemory() throws Exception {
-        final String outputLocation = "target/test-output/" + UUID.randomUUID();
-        final Configuration configuration = getBaseConfiguration(SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
-        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
-        /////
-        Graph graph = GraphFactory.open(configuration);
-        final ComputerResult result = graph.compute(SparkGraphComputer.class).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey("clusterCount").create()).submit().get();
-        /////
-        final Storage storage = SparkContextStorage.open("local[4]");
-
-        assertEquals(2, storage.ls().size());
-        // TEST GRAPH PERSISTENCE
-        assertTrue(storage.exists(Constants.getGraphLocation(outputLocation)));
-        assertEquals(6, IteratorUtils.count(storage.head(Constants.getGraphLocation(outputLocation), Tuple2.class)));
-        assertEquals(6, result.graph().traversal().V().count().next().longValue());
-        assertEquals(0, result.graph().traversal().E().count().next().longValue());
-        assertEquals(6, result.graph().traversal().V().values("name").count().next().longValue());
-        assertEquals(6, result.graph().traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().longValue());
-        /////
-        // TEST MEMORY PERSISTENCE
-        assertEquals(2, (int) result.memory().get("clusterCount"));
-        assertTrue(storage.exists(Constants.getMemoryLocation(outputLocation, "clusterCount")));
-        assertEquals(2, storage.head(Constants.getMemoryLocation(outputLocation, "clusterCount"), Tuple2.class).next()._2());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
index 954cdfe..44e0949 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
@@ -21,13 +21,9 @@ package org.apache.tinkerpop.gremlin.spark.structure.io;
 
 import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
@@ -47,7 +43,9 @@ import org.junit.Test;
 
 import java.util.UUID;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -214,7 +212,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
     public void testComplexChain() throws Exception {
         Spark.create("local[4]");
 
-        final String rddLocation = "target/test-output/graphRDD";
+        final String rddLocation = "target/test-output/" + UUID.randomUUID();
         final Configuration configuration = new BaseConfiguration();
         configuration.setProperty("spark.master", "local[4]");
         configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
@@ -225,6 +223,8 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
         configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddLocation);
         configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
         configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+
+        assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddLocation)));
         Graph graph = GraphFactory.open(configuration);
         graph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.EDGES).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph();
         GraphTraversalSource g = graph.traversal();
@@ -262,7 +262,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
         ////
         graph = GraphFactory.open(configuration);
         graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.NOTHING).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph();
-        assertFalse(Spark.hasRDD(rddLocation));
+        assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddLocation)));
         g = graph.traversal();
         assertEquals(0l, g.V().count().next().longValue());
         assertEquals(0l, g.E().count().next().longValue());

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageTest.java
new file mode 100644
index 0000000..43e8508
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.ClusterCountMapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgram;
+import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.Storage;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class SparkContextStorageTest extends AbstractSparkTest {
+
+    @Test
+    public void shouldPersistGraphAndMemory() throws Exception {
+        final String outputLocation = "target/test-output/" + UUID.randomUUID();
+        final Configuration configuration = getBaseConfiguration(SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
+        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+        /////
+        Graph graph = GraphFactory.open(configuration);
+        final ComputerResult result = graph.compute(SparkGraphComputer.class).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey("clusterCount").create()).submit().get();
+        /////
+        final Storage storage = SparkContextStorage.open("local[4]");
+
+        assertEquals(2, storage.ls().size());
+        // TEST GRAPH PERSISTENCE
+        assertTrue(storage.exists(Constants.getGraphLocation(outputLocation)));
+        assertEquals(6, IteratorUtils.count(storage.headGraph(outputLocation, PersistedInputRDD.class)));
+        assertEquals(6, result.graph().traversal().V().count().next().longValue());
+        assertEquals(0, result.graph().traversal().E().count().next().longValue());
+        assertEquals(6, result.graph().traversal().V().values("name").count().next().longValue());
+        assertEquals(6, result.graph().traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().longValue());
+        /////
+        // TEST MEMORY PERSISTENCE
+        assertEquals(2, (int) result.memory().get("clusterCount"));
+        assertTrue(storage.exists(Constants.getMemoryLocation(outputLocation, "clusterCount")));
+        assertEquals(2, storage.headMemory(outputLocation, "clusterCount", PersistedInputRDD.class).next().getValue());
+    }
+
+}