You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/08 15:20:29 UTC
[04/17] incubator-tinkerpop git commit: Greatly greatly simplified
Hadoop OLTP and interactions with HDFS and SparkContext. The trend -- dir/~g
for graphs and dir/x for memory. A consistent persistence schema makes
everything so much simpler. I always as
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
index 2db267f..887e2f9 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
@@ -19,11 +19,18 @@
package org.apache.tinkerpop.gremlin.spark.structure.io;
+import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
import org.apache.tinkerpop.gremlin.spark.structure.Spark;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.io.Storage;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import scala.collection.JavaConversions;
@@ -114,11 +121,53 @@ public final class SparkContextStorage implements Storage {
}
@Override
- public <V> Iterator<V> head(final String location, final int totalLines, final Class<V> objectClass) {
- return IteratorUtils.limit((Iterator) JavaConversions.asJavaIterator(Spark.getRDD(location).toLocalIterator()), totalLines);
+ public Iterator<Vertex> headGraph(final String location, int totalLines, final Class parserClass) {
+ final Configuration configuration = new BaseConfiguration();
+ configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, Constants.getSearchGraphLocation(location, this).get());
+ configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, parserClass.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, parserClass.getCanonicalName());
+ try {
+ if (InputRDD.class.isAssignableFrom(parserClass)) {
+ return IteratorUtils.limit(IteratorUtils.map(((InputRDD) parserClass.getConstructor().newInstance()).readGraphRDD(configuration, new JavaSparkContext(Spark.getContext())).toLocalIterator(), tuple -> tuple._2().get()), totalLines);
+ } else if (InputFormat.class.isAssignableFrom(parserClass)) {
+ return IteratorUtils.limit(IteratorUtils.map(new InputFormatRDD().readGraphRDD(configuration, new JavaSparkContext(Spark.getContext())).toLocalIterator(), tuple -> tuple._2().get()), totalLines);
+ }
+ } catch (final Exception e) {
+ throw new IllegalArgumentException(e.getMessage(), e);
+ }
+ throw new IllegalArgumentException("The provided parserClass must be an " + InputFormat.class.getCanonicalName() + " or a " + InputRDD.class.getCanonicalName() + ": " + parserClass.getCanonicalName());
+ }
+
+ @Override
+ public <K, V> Iterator<KeyValue<K, V>> headMemory(final String location, final String memoryKey, int totalLines, Class parserClass) {
+ final Configuration configuration = new BaseConfiguration();
+ configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, Constants.getMemoryLocation(location, memoryKey));
+ configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, parserClass.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, parserClass.getCanonicalName());
+ try {
+ if (InputRDD.class.isAssignableFrom(parserClass)) {
+ return IteratorUtils.limit(IteratorUtils.map(((InputRDD) parserClass.getConstructor().newInstance()).readMemoryRDD(configuration, memoryKey, new JavaSparkContext(Spark.getContext())).toLocalIterator(), tuple -> new KeyValue(tuple._1(), tuple._2())), totalLines);
+ } else if (InputFormat.class.isAssignableFrom(parserClass)) {
+ return IteratorUtils.limit(IteratorUtils.map(new InputFormatRDD().readMemoryRDD(configuration, memoryKey, new JavaSparkContext(Spark.getContext())).toLocalIterator(), tuple -> new KeyValue(tuple._1(), tuple._2())), totalLines);
+ }
+ } catch (final Exception e) {
+ throw new IllegalArgumentException(e.getMessage(), e);
+ }
+ throw new IllegalArgumentException("The provided parserClass must be an " + InputFormat.class.getCanonicalName() + " or a " + InputRDD.class.getCanonicalName() + ": " + parserClass.getCanonicalName());
}
+ @Override
+ public Iterator<String> head(final String location, final int totalLines) {
+ return IteratorUtils.limit(IteratorUtils.map(JavaConversions.asJavaIterator(Spark.getRDD(location).toLocalIterator()), Object::toString), totalLines);
+ }
+
+ // TODO: @Override
public String describe(final String location) {
return Spark.getRDD(location).toDebugString();
}
+
+ @Override
+ public String toString() {
+ return StringFactory.storageString(Spark.getContext().master());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GraphMemorySparkTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GraphMemorySparkTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GraphMemorySparkTest.java
deleted file mode 100644
index 10153b0..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GraphMemorySparkTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.structure.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
-import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.ClusterCountMapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgram;
-import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.io.Storage;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import org.junit.Test;
-import scala.Tuple2;
-
-import java.util.UUID;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class GraphMemorySparkTest extends AbstractSparkTest {
-
- @Test
- public void shouldPersistGraphAndMemory() throws Exception {
- final String outputLocation = "target/test-output/" + UUID.randomUUID();
- final Configuration configuration = getBaseConfiguration(SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
- configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
- configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
- /////
- Graph graph = GraphFactory.open(configuration);
- final ComputerResult result = graph.compute(SparkGraphComputer.class).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey("clusterCount").create()).submit().get();
- /////
- final Storage storage = SparkContextStorage.open("local[4]");
-
- assertEquals(2, storage.ls().size());
- // TEST GRAPH PERSISTENCE
- assertTrue(storage.exists(Constants.getGraphLocation(outputLocation)));
- assertEquals(6, IteratorUtils.count(storage.head(Constants.getGraphLocation(outputLocation), Tuple2.class)));
- assertEquals(6, result.graph().traversal().V().count().next().longValue());
- assertEquals(0, result.graph().traversal().E().count().next().longValue());
- assertEquals(6, result.graph().traversal().V().values("name").count().next().longValue());
- assertEquals(6, result.graph().traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().longValue());
- /////
- // TEST MEMORY PERSISTENCE
- assertEquals(2, (int) result.memory().get("clusterCount"));
- assertTrue(storage.exists(Constants.getMemoryLocation(outputLocation, "clusterCount")));
- assertEquals(2, storage.head(Constants.getMemoryLocation(outputLocation, "clusterCount"), Tuple2.class).next()._2());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
index 954cdfe..44e0949 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
@@ -21,13 +21,9 @@ package org.apache.tinkerpop.gremlin.spark.structure.io;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaSparkContext;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
@@ -47,7 +43,9 @@ import org.junit.Test;
import java.util.UUID;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -214,7 +212,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
public void testComplexChain() throws Exception {
Spark.create("local[4]");
- final String rddLocation = "target/test-output/graphRDD";
+ final String rddLocation = "target/test-output/" + UUID.randomUUID();
final Configuration configuration = new BaseConfiguration();
configuration.setProperty("spark.master", "local[4]");
configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
@@ -225,6 +223,8 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddLocation);
configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+
+ assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddLocation)));
Graph graph = GraphFactory.open(configuration);
graph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.EDGES).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph();
GraphTraversalSource g = graph.traversal();
@@ -262,7 +262,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
////
graph = GraphFactory.open(configuration);
graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.NOTHING).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph();
- assertFalse(Spark.hasRDD(rddLocation));
+ assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddLocation)));
g = graph.traversal();
assertEquals(0l, g.V().count().next().longValue());
assertEquals(0l, g.E().count().next().longValue());
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageTest.java
new file mode 100644
index 0000000..43e8508
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.ClusterCountMapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgram;
+import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.Storage;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class SparkContextStorageTest extends AbstractSparkTest {
+
+ @Test
+ public void shouldPersistGraphAndMemory() throws Exception {
+ final String outputLocation = "target/test-output/" + UUID.randomUUID();
+ final Configuration configuration = getBaseConfiguration(SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+ configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
+ configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+ /////
+ Graph graph = GraphFactory.open(configuration);
+ final ComputerResult result = graph.compute(SparkGraphComputer.class).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey("clusterCount").create()).submit().get();
+ /////
+ final Storage storage = SparkContextStorage.open("local[4]");
+
+ assertEquals(2, storage.ls().size());
+ // TEST GRAPH PERSISTENCE
+ assertTrue(storage.exists(Constants.getGraphLocation(outputLocation)));
+ assertEquals(6, IteratorUtils.count(storage.headGraph(outputLocation, PersistedInputRDD.class)));
+ assertEquals(6, result.graph().traversal().V().count().next().longValue());
+ assertEquals(0, result.graph().traversal().E().count().next().longValue());
+ assertEquals(6, result.graph().traversal().V().values("name").count().next().longValue());
+ assertEquals(6, result.graph().traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().longValue());
+ /////
+ // TEST MEMORY PERSISTENCE
+ assertEquals(2, (int) result.memory().get("clusterCount"));
+ assertTrue(storage.exists(Constants.getMemoryLocation(outputLocation, "clusterCount")));
+ assertEquals(2, storage.headMemory(outputLocation, "clusterCount", PersistedInputRDD.class).next().getValue());
+ }
+
+}