You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/06 20:54:37 UTC
[06/30] incubator-tinkerpop git commit: created a GraphFilter
container object that makes storing and applying filters easy. Very clean
model. GraphFilter will next contain stuff like inferences on the filters so
easy push-down predicates are available t
created a GraphFilter container object that makes storing and applying filters easy. Very clean model. GraphFilter will next contain stuff like inferences on the filters so easy push-down predicates are available to the graph system provider.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/bc417dbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/bc417dbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/bc417dbf
Branch: refs/heads/master
Commit: bc417dbf01fee817aa325ee8e4b582fef8ab6788
Parents: 64c6840
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Feb 1 17:44:19 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Feb 1 17:44:19 2016 -0700
----------------------------------------------------------------------
.../process/computer/GiraphGraphComputer.java | 2 +-
.../structure/io/GiraphVertexInputFormat.java | 21 ++---
.../giraph/structure/io/GiraphVertexReader.java | 14 ++-
.../gremlin/process/computer/GraphFilter.java | 89 ++++++++++++++++++++
.../tinkerpop/gremlin/hadoop/Constants.java | 3 +-
.../computer/AbstractHadoopGraphComputer.java | 8 +-
.../structure/io/CommonFileInputFormat.java | 26 ++----
.../hadoop/structure/io/GraphFilterAware.java | 26 +++---
.../structure/io/gryo/GryoInputFormat.java | 2 +-
.../structure/io/gryo/GryoRecordReader.java | 13 ++-
.../structure/io/script/ScriptInputFormat.java | 2 +-
.../structure/io/script/ScriptRecordReader.java | 12 ++-
.../spark/process/computer/SparkExecutor.java | 28 ++----
.../process/computer/SparkGraphComputer.java | 13 ++-
14 files changed, 148 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/bc417dbf/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 2503838..7e5a998 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -146,7 +146,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
try {
// store vertex and edge filters (will propagate down to native InputFormat or else GiraphVertexInputFormat will process)
final Configuration apacheConfiguration = new BaseConfiguration();
- GraphFilterAware.storeVertexAndEdgeFilters(apacheConfiguration, this.giraphConfiguration, this.vertexFilter, this.edgeFilter);
+ GraphFilterAware.storeGraphFilter(apacheConfiguration, this.giraphConfiguration, this.graphFilter);
// it is possible to run graph computer without a vertex program (and thus, only map reduce jobs if they exist)
if (null != this.vertexProgram) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/bc417dbf/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
index cb802fb..76fe3ba 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexInputFormat.java
@@ -31,10 +31,8 @@ import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.GraphFilterAware;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
import java.io.IOException;
import java.util.List;
@@ -45,9 +43,8 @@ import java.util.List;
public final class GiraphVertexInputFormat extends VertexInputFormat {
private InputFormat<NullWritable, VertexWritable> hadoopGraphInputFormat;
- protected Traversal.Admin<Vertex, Vertex> vertexFilter = null;
- protected Traversal.Admin<Edge, Edge> edgeFilter = null;
- private boolean filtersLoader = false;
+ protected GraphFilter graphFilter = new GraphFilter();
+ private boolean graphFilterLoaded = false;
private boolean graphFilterAware;
@Override
@@ -65,7 +62,7 @@ public final class GiraphVertexInputFormat extends VertexInputFormat {
public VertexReader createVertexReader(final InputSplit split, final TaskAttemptContext context) throws IOException {
this.constructor(context.getConfiguration());
try {
- return new GiraphVertexReader(this.hadoopGraphInputFormat.createRecordReader(split, context), this.graphFilterAware, this.vertexFilter, this.edgeFilter);
+ return new GiraphVertexReader(this.hadoopGraphInputFormat.createRecordReader(split, context), this.graphFilterAware, this.graphFilter);
} catch (InterruptedException e) {
throw new IOException(e);
}
@@ -75,12 +72,10 @@ public final class GiraphVertexInputFormat extends VertexInputFormat {
if (null == this.hadoopGraphInputFormat) {
this.hadoopGraphInputFormat = ReflectionUtils.newInstance(configuration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class, InputFormat.class), configuration);
this.graphFilterAware = this.hadoopGraphInputFormat instanceof GraphFilterAware;
- if (!this.filtersLoader) {
- if (configuration.get(Constants.GREMLIN_HADOOP_VERTEX_FILTER, null) != null)
- this.vertexFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(configuration), Constants.GREMLIN_HADOOP_VERTEX_FILTER);
- if (configuration.get(Constants.GREMLIN_HADOOP_EDGE_FILTER, null) != null)
- this.edgeFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(configuration), Constants.GREMLIN_HADOOP_EDGE_FILTER);
- this.filtersLoader = true;
+ if (!this.graphFilterLoaded) {
+ if (configuration.get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null)
+ this.graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(configuration), Constants.GREMLIN_HADOOP_GRAPH_FILTER);
+ this.graphFilterLoaded = true;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/bc417dbf/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
index b4a81fb..d67937c 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphVertexReader.java
@@ -25,11 +25,9 @@ import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphVertex;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.CommonFileInputFormat;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.GraphFilterAware;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
import java.io.IOException;
@@ -39,15 +37,13 @@ import java.io.IOException;
public final class GiraphVertexReader extends VertexReader {
private RecordReader<NullWritable, VertexWritable> recordReader;
- private final Traversal.Admin<org.apache.tinkerpop.gremlin.structure.Vertex, org.apache.tinkerpop.gremlin.structure.Vertex> vertexFilter;
- private final Traversal.Admin<Edge, Edge> edgeFilter;
+ private final GraphFilter graphFilter;
private final boolean graphFilterAware;
- public GiraphVertexReader(final RecordReader<NullWritable, VertexWritable> recordReader, final boolean graphFilterAware, final Traversal.Admin<org.apache.tinkerpop.gremlin.structure.Vertex, org.apache.tinkerpop.gremlin.structure.Vertex> vertexFilter, final Traversal.Admin<Edge, Edge> edgeFilter) {
+ public GiraphVertexReader(final RecordReader<NullWritable, VertexWritable> recordReader, final boolean graphFilterAware, final GraphFilter graphFilter) {
this.recordReader = recordReader;
this.graphFilterAware = graphFilterAware;
- this.vertexFilter = null == vertexFilter ? null : vertexFilter.clone();
- this.edgeFilter = null == edgeFilter ? null : edgeFilter.clone();
+ this.graphFilter = graphFilter.clone();
}
@Override
@@ -63,7 +59,7 @@ public final class GiraphVertexReader extends VertexReader {
while (true) {
if (this.recordReader.nextKeyValue()) {
final VertexWritable vertexWritable = this.recordReader.getCurrentValue();
- final org.apache.tinkerpop.gremlin.structure.Vertex vertex = GraphFilterAware.applyVertexAndEdgeFilters(vertexWritable.get(), this.vertexFilter, this.edgeFilter);
+ final org.apache.tinkerpop.gremlin.structure.Vertex vertex = GraphFilterAware.applyGraphFilter(vertexWritable.get(), this.graphFilter);
if (null != vertex) {
vertexWritable.set(vertex);
return true;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/bc417dbf/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphFilter.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphFilter.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphFilter.java
new file mode 100644
index 0000000..ea2080a
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphFilter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.computer;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import java.io.Serializable;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GraphFilter implements Cloneable, Serializable {
+
+ private Traversal.Admin<Vertex, Vertex> vertexFilter = null;
+ private Traversal.Admin<Edge, Edge> edgeFilter = null;
+
+ public void setVertexFilter(final Traversal<Vertex, Vertex> vertexFilter) {
+ this.vertexFilter = vertexFilter.asAdmin().clone();
+ }
+
+ public void setEdgeFilter(final Traversal<Edge, Edge> edgeFilter) {
+ this.edgeFilter = edgeFilter.asAdmin().clone();
+ }
+
+ public boolean hasVertexFilter() {
+ return this.vertexFilter != null;
+ }
+
+ public final Traversal.Admin<Vertex, Vertex> getVertexFilter() {
+ return this.vertexFilter;
+ }
+
+ public final Traversal.Admin<Edge, Edge> getEdgeFilter() {
+ return this.edgeFilter;
+ }
+
+ public boolean hasEdgeFilter() {
+ return this.edgeFilter != null;
+ }
+
+ public boolean hasFilter() {
+ return this.vertexFilter != null || this.edgeFilter != null;
+ }
+
+ @Override
+ public GraphFilter clone() {
+ try {
+ final GraphFilter clone = (GraphFilter) super.clone();
+ if (null != this.vertexFilter)
+ clone.vertexFilter = this.vertexFilter.clone();
+ if (null != this.edgeFilter)
+ clone.edgeFilter = this.edgeFilter.clone();
+ return clone;
+ } catch (final CloneNotSupportedException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ if (!this.hasFilter())
+ return "graphfilter[none]";
+ else if (this.hasVertexFilter() && this.hasEdgeFilter())
+ return "graphfilter[" + this.vertexFilter + "," + this.edgeFilter + "]";
+ else if (this.hasVertexFilter())
+ return "graphfilter[" + this.vertexFilter + "]";
+ else
+ return "graphfilter[" + this.edgeFilter + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/bc417dbf/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
index baca3d7..e6b88f1 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
@@ -38,8 +38,7 @@ public final class Constants {
public static final String GREMLIN_HADOOP_GRAPH_INPUT_FORMAT_HAS_EDGES = "gremlin.hadoop.graphInputFormat.hasEdges";
public static final String GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES = "gremlin.hadoop.graphOutputFormat.hasEdges";
- public static final String GREMLIN_HADOOP_VERTEX_FILTER = "gremlin.hadoop.vertexFilter";
- public static final String GREMLIN_HADOOP_EDGE_FILTER = "gremlin.hadoop.edgeFilter";
+ public static final String GREMLIN_HADOOP_GRAPH_FILTER = "gremlin.hadoop.graphFilter";
public static final String GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE = "gremlin.hadoop.jarsInDistributedCache";
public static final String HIDDEN_G = Graph.Hidden.hide("g");
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/bc417dbf/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
index 2ed6d9f..a25cebd 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
@@ -26,6 +26,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
@@ -55,8 +56,7 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
protected ResultGraph resultGraph = null;
protected Persist persist = null;
- protected Traversal.Admin<Vertex, Vertex> vertexFilter = null;
- protected Traversal.Admin<Edge, Edge> edgeFilter = null;
+ protected GraphFilter graphFilter = new GraphFilter();
public AbstractHadoopGraphComputer(final HadoopGraph hadoopGraph) {
this.hadoopGraph = hadoopGraph;
@@ -65,13 +65,13 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
@Override
public GraphComputer vertices(final Traversal<Vertex, Vertex> vertexFilter) {
- this.vertexFilter = vertexFilter.asAdmin();
+ this.graphFilter.setVertexFilter(vertexFilter);
return this;
}
@Override
public GraphComputer edges(final Traversal<Edge, Edge> edgeFilter) {
- this.edgeFilter = edgeFilter.asAdmin();
+ this.graphFilter.setEdgeFilter(edgeFilter);
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/bc417dbf/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java
index ddb10c3..9f7d458 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/CommonFileInputFormat.java
@@ -27,27 +27,22 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public abstract class CommonFileInputFormat extends FileInputFormat<NullWritable, VertexWritable> implements HadoopPoolsConfigurable, GraphFilterAware {
- protected Traversal.Admin<Vertex, Vertex> vertexFilter = null;
- protected Traversal.Admin<Edge, Edge> edgeFilter = null;
- private boolean filtersLoader = false;
+ protected GraphFilter graphFilter = new GraphFilter();
+ private boolean graphFilterLoaded = false;
protected void loadVertexAndEdgeFilters(final TaskAttemptContext context) {
- if (!this.filtersLoader) {
- if (context.getConfiguration().get(Constants.GREMLIN_HADOOP_VERTEX_FILTER, null) != null)
- this.vertexFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(context.getConfiguration()), Constants.GREMLIN_HADOOP_VERTEX_FILTER);
- if (context.getConfiguration().get(Constants.GREMLIN_HADOOP_EDGE_FILTER, null) != null)
- this.edgeFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(context.getConfiguration()), Constants.GREMLIN_HADOOP_EDGE_FILTER);
- this.filtersLoader = true;
+ if (!this.graphFilterLoaded) {
+ if (context.getConfiguration().get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null)
+ this.graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(context.getConfiguration()), Constants.GREMLIN_HADOOP_GRAPH_FILTER);
+ this.graphFilterLoaded = true;
}
}
@@ -57,12 +52,7 @@ public abstract class CommonFileInputFormat extends FileInputFormat<NullWritable
}
@Override
- public void setVertexFilter(final Traversal<Vertex, Vertex> vertexFilter) {
- // do nothing. loaded through configuration.
- }
-
- @Override
- public void setEdgeFilter(final Traversal<Edge, Edge> edgeFilter) {
+ public void setGraphFilter(final GraphFilter graphFilter) {
// do nothing. loaded through configuration.
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/bc417dbf/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/GraphFilterAware.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/GraphFilterAware.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/GraphFilterAware.java
index 5bb9538..afc0f29 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/GraphFilterAware.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/GraphFilterAware.java
@@ -21,8 +21,8 @@ package org.apache.tinkerpop.gremlin.hadoop.structure.io;
import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
@@ -40,18 +40,16 @@ import java.util.Iterator;
*/
public interface GraphFilterAware {
- public void setVertexFilter(final Traversal<Vertex, Vertex> vertexFilter);
+ public void setGraphFilter(final GraphFilter graphFilter);
- public void setEdgeFilter(final Traversal<Edge, Edge> edgeFilter);
-
- public static Vertex applyVertexAndEdgeFilters(final Vertex vertex, final Traversal.Admin<Vertex, Vertex> vertexFilter, final Traversal.Admin<Edge, Edge> edgeFilter) {
+ public static Vertex applyGraphFilter(final Vertex vertex, final GraphFilter graphFilter) {
if (null == vertex)
return null;
- else if (vertexFilter == null || TraversalUtil.test(vertex, vertexFilter)) {
- if (edgeFilter != null) {
+ else if (!graphFilter.hasVertexFilter() || TraversalUtil.test(vertex, graphFilter.getVertexFilter())) {
+ if (graphFilter.hasEdgeFilter()) {
final Iterator<Edge> edgeIterator = vertex.edges(Direction.BOTH);
while (edgeIterator.hasNext()) {
- if (!TraversalUtil.test(edgeIterator.next(), edgeFilter))
+ if (!TraversalUtil.test(edgeIterator.next(), graphFilter.getEdgeFilter()))
edgeIterator.remove();
}
}
@@ -61,14 +59,10 @@ public interface GraphFilterAware {
}
}
- public static void storeVertexAndEdgeFilters(final Configuration apacheConfiguration, final org.apache.hadoop.conf.Configuration hadoopConfiguration, final Traversal.Admin<Vertex, Vertex> vertexFilter, final Traversal.Admin<Edge, Edge> edgeFilter) {
- if (null != vertexFilter) {
- VertexProgramHelper.serialize(vertexFilter, apacheConfiguration, Constants.GREMLIN_HADOOP_VERTEX_FILTER);
- hadoopConfiguration.set(Constants.GREMLIN_HADOOP_VERTEX_FILTER, apacheConfiguration.getString(Constants.GREMLIN_HADOOP_VERTEX_FILTER));
- }
- if (null != edgeFilter) {
- VertexProgramHelper.serialize(edgeFilter, apacheConfiguration, Constants.GREMLIN_HADOOP_EDGE_FILTER);
- hadoopConfiguration.set(Constants.GREMLIN_HADOOP_EDGE_FILTER, apacheConfiguration.getString(Constants.GREMLIN_HADOOP_EDGE_FILTER));
+ public static void storeGraphFilter(final Configuration apacheConfiguration, final org.apache.hadoop.conf.Configuration hadoopConfiguration, final GraphFilter graphFilter) {
+ if (graphFilter.hasFilter()) {
+ VertexProgramHelper.serialize(graphFilter, apacheConfiguration, Constants.GREMLIN_HADOOP_GRAPH_FILTER);
+ hadoopConfiguration.set(Constants.GREMLIN_HADOOP_GRAPH_FILTER, apacheConfiguration.getString(Constants.GREMLIN_HADOOP_GRAPH_FILTER));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/bc417dbf/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java
index ebcd03c..78ece7c 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoInputFormat.java
@@ -35,7 +35,7 @@ public final class GryoInputFormat extends CommonFileInputFormat {
@Override
public RecordReader<NullWritable, VertexWritable> createRecordReader(final InputSplit split, final TaskAttemptContext context) throws IOException, InterruptedException {
super.loadVertexAndEdgeFilters(context);
- final RecordReader<NullWritable, VertexWritable> reader = new GryoRecordReader(this.vertexFilter, this.edgeFilter);
+ final RecordReader<NullWritable, VertexWritable> reader = new GryoRecordReader(this.graphFilter);
reader.initialize(split, context);
return reader;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/bc417dbf/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
index 82e85f8..7339411 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java
@@ -30,8 +30,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.GraphFilterAware;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
@@ -58,12 +57,10 @@ public final class GryoRecordReader extends RecordReader<NullWritable, VertexWri
private long currentLength = 0;
private long splitLength;
- private final Traversal.Admin<Vertex, Vertex> vertexFilter;
- private final Traversal.Admin<Edge, Edge> edgeFilter;
+ private final GraphFilter graphFilter;
- public GryoRecordReader(final Traversal.Admin<Vertex, Vertex> vertexFilter, final Traversal.Admin<Edge, Edge> edgeFilter) {
- this.vertexFilter = null == vertexFilter ? null : vertexFilter.clone();
- this.edgeFilter = null == edgeFilter ? null : edgeFilter.clone();
+ public GryoRecordReader(final GraphFilter graphFilter) {
+ this.graphFilter = graphFilter.clone();
}
@Override
@@ -131,7 +128,7 @@ public final class GryoRecordReader extends RecordReader<NullWritable, VertexWri
terminatorLocation = ((byte) currentByte) == TERMINATOR[terminatorLocation] ? terminatorLocation + 1 : 0;
if (terminatorLocation >= TERMINATOR.length) {
try (InputStream in = new ByteArrayInputStream(output.toByteArray())) {
- final Vertex vertex = GraphFilterAware.applyVertexAndEdgeFilters(this.gryoReader.readVertex(in, Attachable::get), this.vertexFilter, this.edgeFilter); // I know how GryoReader works, so I'm cheating here
+ final Vertex vertex = GraphFilterAware.applyGraphFilter(this.gryoReader.readVertex(in, Attachable::get), this.graphFilter); // I know how GryoReader works, so I'm cheating here
if (null != vertex) {
this.vertexWritable.set(vertex);
return true;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/bc417dbf/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java
index ef4b59a..8b9af01 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptInputFormat.java
@@ -39,7 +39,7 @@ public final class ScriptInputFormat extends CommonFileInputFormat {
@Override
public RecordReader<NullWritable, VertexWritable> createRecordReader(final InputSplit split, final TaskAttemptContext context) throws IOException, InterruptedException {
super.loadVertexAndEdgeFilters(context);
- RecordReader<NullWritable, VertexWritable> reader = new ScriptRecordReader(this.vertexFilter, this.edgeFilter);
+ RecordReader<NullWritable, VertexWritable> reader = new ScriptRecordReader(this.graphFilter);
reader.initialize(split, context);
return reader;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/bc417dbf/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
index 34aa138..c9b36b5 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/script/ScriptRecordReader.java
@@ -31,7 +31,7 @@ import org.apache.tinkerpop.gremlin.groovy.DefaultImportCustomizerProvider;
import org.apache.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngine;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.GraphFilterAware;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.Vertex;
@@ -59,13 +59,11 @@ public final class ScriptRecordReader extends RecordReader<NullWritable, VertexW
private final LineRecordReader lineRecordReader;
private ScriptEngine engine;
- private final Traversal.Admin<Vertex, Vertex> vertexFilter;
- private final Traversal.Admin<Edge, Edge> edgeFilter;
+ private final GraphFilter graphFilter;
- public ScriptRecordReader(final Traversal.Admin<Vertex, Vertex> vertexFilter, final Traversal.Admin<Edge, Edge> edgeFilter) {
+ public ScriptRecordReader(final GraphFilter graphFilter) {
this.lineRecordReader = new LineRecordReader();
- this.vertexFilter = null == vertexFilter ? null : vertexFilter.clone();
- this.edgeFilter = null == edgeFilter ? null : edgeFilter.clone();
+ this.graphFilter = graphFilter.clone();
}
@Override
@@ -90,7 +88,7 @@ public final class ScriptRecordReader extends RecordReader<NullWritable, VertexW
final Bindings bindings = this.engine.createBindings();
bindings.put(LINE, this.lineRecordReader.getCurrentValue().toString());
bindings.put(FACTORY, new ScriptElementFactory());
- final Vertex vertex = GraphFilterAware.applyVertexAndEdgeFilters((Vertex) engine.eval(READ_CALL, bindings), this.vertexFilter, this.edgeFilter);
+ final Vertex vertex = GraphFilterAware.applyGraphFilter((Vertex) engine.eval(READ_CALL, bindings), this.graphFilter);
if (vertex != null) {
this.vertexWritable.set(vertex);
return true;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/bc417dbf/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index d034c92..ba80fbc 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -22,22 +22,19 @@ import com.google.common.base.Optional;
import org.apache.commons.configuration.Configuration;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.GraphFilterAware;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.Payload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.Attachable;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
@@ -46,7 +43,6 @@ import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import scala.Tuple2;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -64,24 +60,10 @@ public final class SparkExecutor {
// DATA LOADING //
//////////////////
- public static JavaPairRDD<Object, VertexWritable> filterLoadedGraph(JavaPairRDD<Object, VertexWritable> graphRDD, final Traversal.Admin<Vertex, Vertex> vertexFilter, final Traversal.Admin<Edge, Edge> edgeFilter) {
+ public static JavaPairRDD<Object, VertexWritable> filterLoadedGraph(JavaPairRDD<Object, VertexWritable> graphRDD, final GraphFilter graphFilter) {
return graphRDD.mapPartitionsToPair(partitionIterator -> {
- final Traversal.Admin<Vertex, Vertex> vFilter = (null == vertexFilter) ? null : vertexFilter.clone();
- final Traversal.Admin<Edge, Edge> eFilter = (null == edgeFilter) ? null : edgeFilter.clone();
- return () -> IteratorUtils.filter(partitionIterator, tuple -> {
- if (null != vFilter && !TraversalUtil.test(tuple._2().get(), vFilter))
- return false;
- else {
- if (null != eFilter) {
- final Iterator<Edge> edgeIterator = tuple._2().get().edges(Direction.BOTH);
- while (edgeIterator.hasNext()) {
- if (!TraversalUtil.test(edgeIterator.next(), eFilter))
- edgeIterator.remove();
- }
- }
- return true;
- }
- });
+ final GraphFilter gFilter = graphFilter.clone();
+ return () -> IteratorUtils.filter(partitionIterator, tuple -> GraphFilterAware.applyGraphFilter(tuple._2().get(), gFilter) != null);
}, true);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/bc417dbf/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index a93bf06..1b59dd7 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -150,15 +150,12 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
outputRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class).newInstance();
// if the input class can filter on load, then set the filters
if (inputRDD instanceof InputFormatRDD && GraphFilterAware.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class, InputFormat.class))) {
- GraphFilterAware.storeVertexAndEdgeFilters(apacheConfiguration, hadoopConfiguration, this.vertexFilter, this.edgeFilter);
+ GraphFilterAware.storeGraphFilter(apacheConfiguration, hadoopConfiguration, this.graphFilter);
filtered = false;
} else if (inputRDD instanceof GraphFilterAware) {
- if (null != this.vertexFilter)
- ((GraphFilterAware) inputRDD).setVertexFilter(this.vertexFilter);
- if (null != edgeFilter)
- ((GraphFilterAware) inputRDD).setEdgeFilter(this.edgeFilter);
+ ((GraphFilterAware) inputRDD).setGraphFilter(this.graphFilter);
filtered = false;
- } else if (null != this.vertexFilter || null != this.edgeFilter) {
+ } else if (this.graphFilter.hasFilter()) {
filtered = true;
} else {
filtered = false;
@@ -195,8 +192,8 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
JavaPairRDD<Object, VertexWritable> loadedGraphRDD = inputRDD.readGraphRDD(apacheConfiguration, sparkContext);
// if there are vertex or edge filters, filter the loaded graph rdd prior to partitioning and persisting
if (filtered) {
- this.logger.info("Filtering the loaded graphRDD: " + this.vertexFilter + " and " + this.edgeFilter);
- loadedGraphRDD = SparkExecutor.filterLoadedGraph(loadedGraphRDD, this.vertexFilter, this.edgeFilter);
+ this.logger.info("Filtering the loaded graphRDD: " + this.graphFilter);
+ loadedGraphRDD = SparkExecutor.filterLoadedGraph(loadedGraphRDD, this.graphFilter);
}
// if the loaded graph RDD is already partitioned use that partitioner, else partition it with HashPartitioner
if (loadedGraphRDD.partitioner().isPresent())