You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by dk...@apache.org on 2016/01/11 17:49:10 UTC
[02/30] incubator-tinkerpop git commit: added Storage to
gremlin-core. Storage is an interface that OLAP system can implement. It
provides ls(), rmr(), rm(),
etc. type methods that make it easy for users to interact (via a common
interface) with the unde
added Storage to gremlin-core. Storage is an interface that OLAP system can implement. It provides ls(), rmr(), rm(), etc. type methods that make it easy for users to interact (via a common interface) with the underlying persitance system. Now both HDFS and Spark provide their own Storage implementations and TADA. Really pretty.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/58d92407
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/58d92407
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/58d92407
Branch: refs/heads/TINKERPOP-320
Commit: 58d9240764cd6e1f3779097966c53058264e00e6
Parents: f3ebed0
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Dec 9 13:46:43 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Dec 9 13:46:43 2015 -0700
----------------------------------------------------------------------
.../peerpressure/ClusterCountMapReduce.java | 7 +-
.../tinkerpop/gremlin/structure/io/Storage.java | 82 ++++++++
.../gremlin/structure/util/StringFactory.java | 18 +-
.../hadoop/groovy/plugin/HadoopLoader.groovy | 138 --------------
.../groovy/plugin/HadoopGremlinPlugin.java | 7 +-
.../hadoop/structure/io/FileSystemStorage.java | 186 +++++++++++++++++++
.../groovy/plugin/GraphMemoryHDFSCheck.java | 64 +++++++
.../hadoop/groovy/plugin/HadoopPluginSuite.java | 2 +-
.../spark/groovy/plugin/SparkLoader.groovy | 68 -------
.../spark/groovy/plugin/SparkGremlinPlugin.java | 5 +-
.../spark/structure/io/PersistedInputRDD.java | 8 +-
.../spark/structure/io/SparkContextStorage.java | 124 +++++++++++++
.../gremlin/spark/AbstractSparkTest.java | 30 +++
.../groovy/plugin/SparkGremlinPluginTest.java | 10 +-
.../structure/io/GraphMemorySparkTest.java | 75 ++++++++
15 files changed, 593 insertions(+), 231 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java
index 1112a46..d343e8e 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/ClusterCountMapReduce.java
@@ -64,7 +64,7 @@ public class ClusterCountMapReduce extends StaticMapReduce<MapReduce.NullObject,
@Override
public boolean doStage(final Stage stage) {
- return true;
+ return !stage.equals(Stage.COMBINE);
}
@Override
@@ -76,11 +76,6 @@ public class ClusterCountMapReduce extends StaticMapReduce<MapReduce.NullObject,
}
@Override
- public void combine(final NullObject key, final Iterator<Serializable> values, final ReduceEmitter<NullObject, Integer> emitter) {
- this.reduce(key, values, emitter);
- }
-
- @Override
public void reduce(final NullObject key, final Iterator<Serializable> values, final ReduceEmitter<NullObject, Integer> emitter) {
final Set<Serializable> set = new HashSet<>();
values.forEachRemaining(set::add);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java
new file mode 100644
index 0000000..1f1bcf4
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.structure.io;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface Storage {
+
+ public List<String> ls();
+
+ public List<String> ls(final String location);
+
+ public boolean mkdir(final String location);
+
+ public boolean cp(final String fromLocation, final String toLocation);
+
+ public boolean exists(final String location);
+
+ public boolean rm(final String location);
+
+ public boolean rmr(final String location);
+
+ public <V> Iterator<V> head(final String location, final int totalLines, final Class<V> objectClass);
+
+ public default Iterator<Object> head(final String location) {
+ return this.head(location, Object.class);
+ }
+
+ public default Iterator<Object> head(final String location, final int totalLines) {
+ return this.head(location, totalLines, Object.class);
+ }
+
+ public default <V> Iterator<V> head(final String location, final Class<V> objectClass) {
+ return this.head(location, Integer.MAX_VALUE, objectClass);
+ }
+
+ /*
+
+ FileSystem.metaClass.copyToLocal = { final String from, final String to ->
+ return ((FileSystem) delegate).copyToLocalFile(new Path(from), new Path(to));
+ }
+
+ FileSystem.metaClass.copyFromLocal = { final String from, final String to ->
+ return ((FileSystem) delegate).copyFromLocalFile(new Path(from), new Path(to));
+ }
+
+ FileSystem.metaClass.mergeToLocal = { final String from, final String to ->
+ final FileSystem fs = (FileSystem) delegate;
+ final FileSystem local = FileSystem.getLocal(new Configuration());
+ final FSDataOutputStream outA = local.create(new Path(to));
+
+ HDFSTools.getAllFilePaths(fs, new Path(from), HiddenFileFilter.instance()).each {
+ final FSDataInputStream inA = fs.open(it);
+ IOUtils.copyBytes(inA, outA, 8192);
+ inA.close();
+ }
+ outA.close();
+ }
+
+ */
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
index e716a60..9ae8116 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
@@ -18,18 +18,18 @@
*/
package org.apache.tinkerpop.gremlin.structure.util;
+import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalEngine;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
-import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.Memory;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalRing;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
@@ -75,6 +75,7 @@ public final class StringFactory {
private static final String EMPTY_PROPERTY = "p[empty]";
private static final String EMPTY_VERTEX_PROPERTY = "vp[empty]";
private static final String LINE_SEPARATOR = System.getProperty("line.separator");
+ private static final String STORAGE = "storage";
private static final String featuresStartWith = "supports";
private static final int prefixLength = featuresStartWith.length();
@@ -237,4 +238,9 @@ public final class StringFactory {
public static String traversalString(final Traversal.Admin<?, ?> traversal) {
return traversal.getSteps().toString();
}
+
+ public static String storageString(final String internalString) {
+ return STORAGE + L_BRACKET + internalString + R_BRACKET;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/hadoop-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopLoader.groovy
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopLoader.groovy b/hadoop-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopLoader.groovy
deleted file mode 100644
index 616c2f0..0000000
--- a/hadoop-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopLoader.groovy
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.groovy.plugin
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.*
-import org.apache.hadoop.io.IOUtils
-import org.apache.hadoop.io.Text
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HDFSTools
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HiddenFileFilter
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.TextIterator
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritableIterator
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-class HadoopLoader {
-
- private static final String SPACE = " ";
- private static final String D_SPACE = "(D) ";
-
- public static void load() {
-
- FileStatus.metaClass.toString = {
- StringBuilder s = new StringBuilder();
- s.append(((FileStatus) delegate).getPermission()).append(SPACE)
- s.append(((FileStatus) delegate).getOwner()).append(SPACE);
- s.append(((FileStatus) delegate).getGroup()).append(SPACE);
- s.append(((FileStatus) delegate).getLen()).append(SPACE);
- if (((FileStatus) delegate).isDir())
- s.append(D_SPACE);
- s.append(((FileStatus) delegate).getPath().getName());
- return s.toString();
- }
-
- FileSystem.metaClass.ls = { String path ->
- if (null == path || path.equals("/")) path = ((FileSystem) delegate).getHomeDirectory().toString();
- return ((FileSystem) delegate).globStatus(new Path(path + "/*")).collect {
- it.toString()
- };
- }
-
- FileSystem.metaClass.mkdir = { String path ->
- ((FileSystem) delegate).mkdirs(new Path(path));
- }
-
- FileSystem.metaClass.cp = { final String from, final String to ->
- return FileUtil.copy(((FileSystem) delegate), new Path(from), ((FileSystem) delegate), new Path(to), false, new Configuration());
- }
-
- FileSystem.metaClass.exists = { final String path ->
- return ((FileSystem) delegate).exists(new Path(path));
- }
-
- FileSystem.metaClass.rm = { final String path ->
- HDFSTools.globDelete((FileSystem) delegate, path, false);
- }
-
- FileSystem.metaClass.rmr = { final String path ->
- HDFSTools.globDelete((FileSystem) delegate, path, true);
- }
-
- FileSystem.metaClass.copyToLocal = { final String from, final String to ->
- return ((FileSystem) delegate).copyToLocalFile(new Path(from), new Path(to));
- }
-
- FileSystem.metaClass.copyFromLocal = { final String from, final String to ->
- return ((FileSystem) delegate).copyFromLocalFile(new Path(from), new Path(to));
- }
-
- FileSystem.metaClass.mergeToLocal = { final String from, final String to ->
- final FileSystem fs = (FileSystem) delegate;
- final FileSystem local = FileSystem.getLocal(new Configuration());
- final FSDataOutputStream outA = local.create(new Path(to));
-
- HDFSTools.getAllFilePaths(fs, new Path(from), HiddenFileFilter.instance()).each {
- final FSDataInputStream inA = fs.open(it);
- IOUtils.copyBytes(inA, outA, 8192);
- inA.close();
- }
- outA.close();
- }
-
- FileSystem.metaClass.head = { final String path, final int totalLines ->
- return headMaker((FileSystem) delegate, path, totalLines, Text.class);
- }
-
- FileSystem.metaClass.head = { final String path ->
- return headMaker((FileSystem) delegate, path, Integer.MAX_VALUE, Text.class);
- }
-
- FileSystem.metaClass.head = {
- final String path, final Class<org.apache.hadoop.io.Writable> writableClass ->
- return headMaker((FileSystem) delegate, path, Integer.MAX_VALUE, writableClass);
- }
-
- FileSystem.metaClass.head = {
- final String path, final int totalLines, final Class<org.apache.hadoop.io.Writable> writableClass ->
- return headMaker((FileSystem) delegate, path, totalLines, writableClass);
- }
-
- /*FileSystem.metaClass.unzip = { final String from, final String to, final boolean deleteZip ->
- HDFSTools.decompressPath((FileSystem) delegate, from, to, Tokens.BZ2, deleteZip);
- }*/
-
- }
-
- private static Iterator headMaker(
- final FileSystem fs,
- final String path, final int totalLines, final Class<org.apache.hadoop.io.Writable> writableClass) {
- if (writableClass.equals(ObjectWritable.class))
- return IteratorUtils.limit(new ObjectWritableIterator(fs.getConf(), new Path(path)), totalLines);
- else if (writableClass.equals(VertexWritable.class))
- return IteratorUtils.limit(new VertexWritableIterator(fs.getConf(), new Path(path)), totalLines);
- else
- return IteratorUtils.limit(new TextIterator(fs.getConf(), new Path(path)), totalLines);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
index d62b4e5..b4f5cd2 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
@@ -32,6 +32,7 @@ import org.apache.tinkerpop.gremlin.hadoop.process.computer.mapreduce.MapReduceG
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HDFSTools;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
@@ -86,10 +87,10 @@ public final class HadoopGremlinPlugin extends AbstractGremlinPlugin {
pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", MapReduceGraphComputer.class.getName()));
///
pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", HadoopGraph.class.getName()));
- pluginAcceptor.eval(HadoopLoader.class.getCanonicalName() + ".load()");
+ //pluginAcceptor.eval(HadoopLoader.class.getCanonicalName() + ".load()");
- pluginAcceptor.addBinding("hdfs", FileSystem.get(new Configuration()));
- pluginAcceptor.addBinding("local", FileSystem.getLocal(new Configuration()));
+ pluginAcceptor.addBinding("hdfs", new FileSystemStorage(FileSystem.get(new Configuration())));
+ pluginAcceptor.addBinding("local", new FileSystemStorage(FileSystem.getLocal(new Configuration())));
if (null == System.getenv(Constants.HADOOP_GREMLIN_LIBS))
HadoopGraph.LOGGER.warn("Be sure to set the environmental variable: " + Constants.HADOOP_GREMLIN_LIBS);
else
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
new file mode 100644
index 0000000..56dfe52
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HDFSTools;
+import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HiddenFileFilter;
+import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.TextIterator;
+import org.apache.tinkerpop.gremlin.structure.io.Storage;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class FileSystemStorage implements Storage {
+
+ private static final String SPACE = " ";
+ private static final String D_SPACE = "(D) ";
+
+ private final FileSystem fs;
+
+ public FileSystemStorage(final FileSystem fileSystem) {
+ this.fs = fileSystem;
+ }
+
+ private static String fileStatusString(final FileStatus status) {
+ StringBuilder s = new StringBuilder();
+ s.append(status.getPermission()).append(" ");
+ s.append(status.getOwner()).append(SPACE);
+ s.append(status.getGroup()).append(SPACE);
+ s.append(status.getLen()).append(SPACE);
+ if (status.isDir())
+ s.append(D_SPACE);
+ s.append(status.getPath().getName());
+ return s.toString();
+ }
+
+ @Override
+ public List<String> ls() {
+ return this.ls("/");
+ }
+
+ @Override
+ public List<String> ls(final String location) {
+ try {
+ final String newLocation;
+ newLocation = location.equals("/") ? this.fs.getHomeDirectory().toString() : location;
+ return Stream.of(this.fs.globStatus(new Path(newLocation + "/*"))).map(FileSystemStorage::fileStatusString).collect(Collectors.toList());
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean mkdir(final String location) {
+ try {
+ return this.fs.mkdirs(new Path(location));
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean cp(final String fromLocation, final String toLocation) {
+ try {
+ return FileUtil.copy(this.fs, new Path(fromLocation), this.fs, new Path(toLocation), false, new Configuration());
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean exists(final String location) {
+ try {
+ return this.fs.exists(new Path(location));
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean rm(final String location) {
+ try {
+ return HDFSTools.globDelete(this.fs, location, false);
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean rmr(final String location) {
+ try {
+ return HDFSTools.globDelete(this.fs, location, true);
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public <V> Iterator<V> head(final String location, final int totalLines, final Class<V> objectClass) {
+ return headMaker(this.fs, location, totalLines, (Class<? extends Writable>) objectClass);
+ }
+
+ @Override
+ public String toString() {
+ return StringFactory.storageString(this.fs.toString());
+ }
+
+ private static Iterator headMaker(final FileSystem fs, final String path, final int totalLines, final Class<? extends Writable> writableClass) {
+ try {
+ if (writableClass.equals(ObjectWritable.class))
+ return IteratorUtils.limit(new ObjectWritableIterator(fs.getConf(), new Path(path)), totalLines);
+ else if (writableClass.equals(VertexWritable.class))
+ return IteratorUtils.limit(new VertexWritableIterator(fs.getConf(), new Path(path)), totalLines);
+ else
+ return IteratorUtils.limit(new TextIterator(fs.getConf(), new Path(path)), totalLines);
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ /////////
+
+ public void copyToLocal(final String fromLocation, final String toLocation) {
+ try {
+ this.fs.copyToLocalFile(new Path(fromLocation), new Path(toLocation));
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public void copyFromLocal(final String fromLocation, final String toLocation) {
+ try {
+ this.fs.copyFromLocalFile(new Path(fromLocation), new Path(toLocation));
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public void mergeToLocal(final String fromLocation, final String toLocation) {
+ try {
+ final FileSystem local = FileSystem.getLocal(new Configuration());
+ final FSDataOutputStream outA = local.create(new Path(toLocation));
+ for (final Path path : HDFSTools.getAllFilePaths(fs, new Path(fromLocation), HiddenFileFilter.instance())) {
+ final FSDataInputStream inA = fs.open(path);
+ IOUtils.copyBytes(inA, outA, 8192);
+ inA.close();
+ }
+ outA.close();
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/GraphMemoryHDFSCheck.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/GraphMemoryHDFSCheck.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/GraphMemoryHDFSCheck.java
new file mode 100644
index 0000000..d47ce43
--- /dev/null
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/GraphMemoryHDFSCheck.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.groovy.plugin;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.tinkerpop.gremlin.AbstractGremlinTest;
+import org.apache.tinkerpop.gremlin.LoadGraphWith;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.ClusterCountMapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgram;
+import org.apache.tinkerpop.gremlin.structure.io.Storage;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class GraphMemoryHDFSCheck extends AbstractGremlinTest {
+
+ @Test
+ @LoadGraphWith(LoadGraphWith.GraphData.MODERN)
+ public void shouldPersistGraphAndMemory() throws Exception {
+ final ComputerResult result = graph.compute(graphComputerClass.get()).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey("clusterCount").create()).submit().get();
+ /////
+ final Storage storage = new FileSystemStorage(FileSystem.get(ConfUtil.makeHadoopConfiguration(graph.configuration())));
+ // TEST GRAPH PERSISTENCE
+ assertTrue(storage.exists(Constants.getGraphLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))));
+ assertEquals(6, result.graph().traversal().V().count().next().longValue());
+ assertEquals(0, result.graph().traversal().E().count().next().longValue());
+ assertEquals(6, result.graph().traversal().V().values("name").count().next().longValue());
+ assertEquals(6, result.graph().traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().longValue());
+ assertEquals(2, result.graph().traversal().V().values(PeerPressureVertexProgram.CLUSTER).dedup().count().next().longValue());
+ /////
+ // TEST MEMORY PERSISTENCE
+ assertEquals(2, (int) result.memory().get("clusterCount"));
+ assertTrue(storage.exists(Constants.getMemoryLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount")));
+ // System.out.println(IteratorUtils.list(storage.head(Constants.getMemoryLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount"))));
+// assertEquals(1, IteratorUtils.count(storage.head(Constants.getMemoryLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount"))));
+ assertEquals(2, storage.head(Constants.getMemoryLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount")).next());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopPluginSuite.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopPluginSuite.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopPluginSuite.java
index 6c4cc20..7dc8143 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopPluginSuite.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopPluginSuite.java
@@ -29,6 +29,6 @@ import org.junit.runners.model.RunnerBuilder;
*/
public class HadoopPluginSuite extends AbstractGremlinSuite {
public HadoopPluginSuite(final Class<?> klass, final RunnerBuilder builder) throws InitializationError {
- super(klass, builder, new Class<?>[]{HadoopGremlinPluginCheck.class}, new Class<?>[]{HadoopGremlinPluginCheck.class}, true, TraversalEngine.Type.COMPUTER);
+ super(klass, builder, new Class<?>[]{HadoopGremlinPluginCheck.class, GraphMemoryHDFSCheck.class}, new Class<?>[]{HadoopGremlinPluginCheck.class, GraphMemoryHDFSCheck.class}, true, TraversalEngine.Type.COMPUTER);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkLoader.groovy
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkLoader.groovy b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkLoader.groovy
deleted file mode 100644
index 53d385f..0000000
--- a/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkLoader.groovy
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.groovy.plugin
-
-import org.apache.spark.rdd.RDD
-import org.apache.tinkerpop.gremlin.spark.structure.Spark
-import scala.collection.JavaConversions
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-class SparkLoader {
-
- public static void load() {
-
- Spark.metaClass.static.ls = {
- final List<String> rdds = new ArrayList<>();
- for (final RDD<?> rdd : Spark.getRDDs()) {
- rdds.add(rdd.name() + " [" + rdd.getStorageLevel().description() + "]")
- }
- return rdds;
- }
-
- Spark.metaClass.static.rm = { final String rddName ->
- for (final RDD<?> rdd : Spark.getRDDs()) {
- if (rdd.name().matches(rddName.replace(".", "\\.").replace("*", ".*")))
- Spark.removeRDD(rdd.name());
- }
- }
-
- Spark.metaClass.static.head = { final String rddName ->
- return Spark.head(rddName, Integer.MAX_VALUE);
- }
-
- Spark.metaClass.static.head = { final String rddName, final int totalLines ->
- final List<Object> data = new ArrayList<>();
- final Iterator<?> itty = JavaConversions.asJavaIterator(Spark.getRDD(rddName).toLocalIterator());
- for (int i = 0; i < totalLines; i++) {
- if (itty.hasNext())
- data.add(itty.next());
- else
- break;
- }
- return data;
- }
-
- Spark.metaClass.static.describe = { final String rddName ->
- return Spark.getRDD(rddName).toDebugString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
index 9351a1e..7711435 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
@@ -27,6 +27,7 @@ import org.apache.tinkerpop.gremlin.groovy.plugin.PluginInitializationException;
import org.apache.tinkerpop.gremlin.groovy.plugin.RemoteAcceptor;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
import org.apache.tinkerpop.gremlin.spark.structure.Spark;
+import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage;
import java.util.HashSet;
import java.util.Optional;
@@ -43,6 +44,7 @@ public final class SparkGremlinPlugin extends AbstractGremlinPlugin {
add("import org.apache.log4j.*");
add(IMPORT_SPACE + SparkGraphComputer.class.getPackage().getName() + DOT_STAR);
add(IMPORT_SPACE + Spark.class.getPackage().getName() + DOT_STAR);
+ add(IMPORT_SPACE + SparkContextStorage.class.getPackage().getName() + DOT_STAR);
}};
@Override
@@ -56,8 +58,7 @@ public final class SparkGremlinPlugin extends AbstractGremlinPlugin {
try {
pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", SparkGraphComputer.class.getName()));
pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.ERROR)", MetricsSystem.class.getName()));
- pluginAcceptor.eval("spark = Spark");
- pluginAcceptor.eval(SparkLoader.class.getCanonicalName() + ".load()");
+ pluginAcceptor.eval("spark = SparkContextStorage.open()");
} catch (final Exception e) {
throw new PluginInitializationException(e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
index 52d18f1..55bf53b 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
@@ -36,8 +36,9 @@ public final class PersistedInputRDD implements InputRDD {
public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
if (!configuration.containsKey(Constants.GREMLIN_HADOOP_INPUT_LOCATION))
throw new IllegalArgumentException("There is no provided " + Constants.GREMLIN_HADOOP_INPUT_LOCATION + " to read the persisted RDD from");
- final String graphRDDName = Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION));
Spark.create(sparkContext.sc());
+ final String inputLocation = configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION);
+ final String graphRDDName = Spark.hasRDD(inputLocation) ? inputLocation : Constants.getGraphLocation(inputLocation);
return JavaPairRDD.fromJavaRDD((JavaRDD) Spark.getRDD(graphRDDName).toJavaRDD());
}
@@ -45,8 +46,9 @@ public final class PersistedInputRDD implements InputRDD {
public <K, V> JavaPairRDD<K, V> readMemoryRDD(final Configuration configuration, final String memoryKey, final JavaSparkContext sparkContext) {
if (!configuration.containsKey(Constants.GREMLIN_HADOOP_INPUT_LOCATION))
throw new IllegalArgumentException("There is no provided " + Constants.GREMLIN_HADOOP_INPUT_LOCATION + " to read the persisted RDD from");
- final String sideEffectRDDName = Constants.getMemoryLocation(configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION), memoryKey);
+ final String inputLocation = configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION);
+ final String memoryRDDName = Spark.hasRDD(inputLocation) ? inputLocation : Constants.getMemoryLocation(inputLocation, memoryKey);
Spark.create(sparkContext.sc());
- return JavaPairRDD.fromJavaRDD((JavaRDD) Spark.getRDD(sideEffectRDDName).toJavaRDD());
+ return JavaPairRDD.fromJavaRDD((JavaRDD) Spark.getRDD(memoryRDDName).toJavaRDD());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
new file mode 100644
index 0000000..2db267f
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.SparkContext;
+import org.apache.spark.rdd.RDD;
+import org.apache.tinkerpop.gremlin.spark.structure.Spark;
+import org.apache.tinkerpop.gremlin.structure.io.Storage;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import scala.collection.JavaConversions;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkContextStorage implements Storage {
+
+ private SparkContextStorage() {
+
+ }
+
+ public static SparkContextStorage open() {
+ return new SparkContextStorage();
+ }
+
+ public static SparkContextStorage open(final String master) {
+ Spark.create(master);
+ return new SparkContextStorage();
+ }
+
+ public static SparkContextStorage open(final Configuration configuration) {
+ Spark.create(configuration);
+ return new SparkContextStorage();
+ }
+
+ public static SparkContextStorage open(final SparkContext sparkContext) {
+ Spark.create(sparkContext);
+ return new SparkContextStorage();
+ }
+
+
+ @Override
+ public List<String> ls() {
+ return ls("*");
+ }
+
+ @Override
+ public List<String> ls(final String location) {
+ final List<String> rdds = new ArrayList<>();
+ final String wildCardLocation = location.replace(".", "\\.").replace("*", ".*");
+ for (final RDD<?> rdd : Spark.getRDDs()) {
+ if (rdd.name().matches(wildCardLocation))
+ rdds.add(rdd.name() + " [" + rdd.getStorageLevel().description() + "]");
+ }
+ return rdds;
+ }
+
+ @Override
+ public boolean mkdir(final String location) {
+ throw new UnsupportedOperationException("This operation does not make sense for a persited SparkContext");
+ }
+
+ @Override
+ public boolean cp(final String fromLocation, final String toLocation) {
+ Spark.getRDD(fromLocation).setName(toLocation).cache();
+ Spark.removeRDD(fromLocation);
+ return true;
+ }
+
+ @Override
+ public boolean exists(final String location) {
+ return Spark.hasRDD(location);
+ }
+
+ @Override
+ public boolean rm(final String location) {
+ if (!Spark.hasRDD(location))
+ return false;
+ Spark.removeRDD(location);
+ return true;
+ }
+
+ @Override
+ public boolean rmr(final String location) {
+ final List<String> rdds = new ArrayList<>();
+ final String wildCardLocation = location.replace(".", "\\.").replace("*", ".*");
+ for (final RDD<?> rdd : Spark.getRDDs()) {
+ if (rdd.name().matches(wildCardLocation))
+ rdds.add(rdd.name());
+ }
+ rdds.forEach(Spark::removeRDD);
+ return rdds.size() > 0;
+ }
+
+ @Override
+ public <V> Iterator<V> head(final String location, final int totalLines, final Class<V> objectClass) {
+ return IteratorUtils.limit((Iterator) JavaConversions.asJavaIterator(Spark.getRDD(location).toLocalIterator()), totalLines);
+ }
+
+ public String describe(final String location) {
+ return Spark.getRDD(location).toDebugString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java
index ccff1ab..3fc2a59 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java
@@ -19,10 +19,21 @@
package org.apache.tinkerpop.gremlin.spark;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
import org.apache.tinkerpop.gremlin.spark.structure.Spark;
+import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
+import org.apache.tinkerpop.gremlin.structure.Graph;
import org.junit.After;
import org.junit.Before;
@@ -43,4 +54,23 @@ public abstract class AbstractSparkTest {
Spark.close();
System.out.println("SparkContext has been closed for " + this.getClass().getCanonicalName() + "-setupTest");
}
+
+ protected Configuration getBaseConfiguration(final String inputLocation) {
+ final BaseConfiguration configuration = new BaseConfiguration();
+ configuration.setDelimiterParsingDisabled(true);
+ configuration.setProperty("spark.master", "local[4]");
+ configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+ configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+ configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+ if (inputLocation.contains(".kryo"))
+ configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+ else if (inputLocation.contains(".json"))
+ configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GraphSONInputFormat.class.getCanonicalName());
+ else
+ configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName());
+
+ configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+ return configuration;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/plugin/SparkGremlinPluginTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/plugin/SparkGremlinPluginTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/plugin/SparkGremlinPluginTest.java
index 0b60825..7574908 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/plugin/SparkGremlinPluginTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/plugin/SparkGremlinPluginTest.java
@@ -34,9 +34,11 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.junit.Before;
import org.junit.Test;
+import java.util.Iterator;
import java.util.List;
import static org.junit.Assert.assertEquals;
@@ -97,9 +99,9 @@ public class SparkGremlinPluginTest extends AbstractSparkTest {
assertEquals(1, ((List<String>) this.console.eval("spark.ls()")).size());
assertTrue(((List<String>) this.console.eval("spark.ls()")).contains("target/test-output/graph-1/~g [Memory Deserialized 1x Replicated]"));
- assertEquals(6, ((List<Object>) this.console.eval("spark.head('target/test-output/graph-1/~g')")).size());
+ assertEquals(6, IteratorUtils.count(((Iterator<Object>) this.console.eval("spark.head('target/test-output/graph-1/~g')"))));
- this.console.eval("spark.rm('target/test-output/graph-*')");
+ this.console.eval("spark.rmr('target/test-output/graph-*')");
assertEquals(0, ((List<String>) this.console.eval("spark.ls()")).size());
//////
@@ -116,9 +118,9 @@ public class SparkGremlinPluginTest extends AbstractSparkTest {
this.console.eval("graph.compute(SparkGraphComputer).program(PageRankVertexProgram.build().iterations(1).create()).submit().get()");
assertEquals(3, ((List<String>) this.console.eval("spark.ls()")).size());
- this.console.eval("spark.rm('target/test-output/graph-*')");
+ this.console.eval("spark.rmr('target/test-output/graph-*')");
assertEquals(1, ((List<String>) this.console.eval("spark.ls()")).size());
- this.console.eval("spark.rm('*')");
+ this.console.eval("spark.rmr('*')");
assertEquals(0, ((List<String>) this.console.eval("spark.ls()")).size());
//
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/58d92407/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GraphMemorySparkTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GraphMemorySparkTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GraphMemorySparkTest.java
new file mode 100644
index 0000000..10153b0
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GraphMemorySparkTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.ClusterCountMapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgram;
+import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.Storage;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.junit.Test;
+import scala.Tuple2;
+
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class GraphMemorySparkTest extends AbstractSparkTest {
+
+ @Test
+ public void shouldPersistGraphAndMemory() throws Exception {
+ final String outputLocation = "target/test-output/" + UUID.randomUUID();
+ final Configuration configuration = getBaseConfiguration(SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+ configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
+ configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+ /////
+ Graph graph = GraphFactory.open(configuration);
+ final ComputerResult result = graph.compute(SparkGraphComputer.class).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey("clusterCount").create()).submit().get();
+ /////
+ final Storage storage = SparkContextStorage.open("local[4]");
+
+ assertEquals(2, storage.ls().size());
+ // TEST GRAPH PERSISTENCE
+ assertTrue(storage.exists(Constants.getGraphLocation(outputLocation)));
+ assertEquals(6, IteratorUtils.count(storage.head(Constants.getGraphLocation(outputLocation), Tuple2.class)));
+ assertEquals(6, result.graph().traversal().V().count().next().longValue());
+ assertEquals(0, result.graph().traversal().E().count().next().longValue());
+ assertEquals(6, result.graph().traversal().V().values("name").count().next().longValue());
+ assertEquals(6, result.graph().traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().longValue());
+ /////
+ // TEST MEMORY PERSISTENCE
+ assertEquals(2, (int) result.memory().get("clusterCount"));
+ assertTrue(storage.exists(Constants.getMemoryLocation(outputLocation, "clusterCount")));
+ assertEquals(2, storage.head(Constants.getMemoryLocation(outputLocation, "clusterCount"), Tuple2.class).next()._2());
+ }
+
+}