You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/05/07 03:00:53 UTC
git commit: updated refs/heads/trunk to afb3ecc
Updated Branches:
refs/heads/trunk e495238bb -> afb3ecce1
GIRAPH-560: Input filtering (nitay)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/afb3ecce
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/afb3ecce
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/afb3ecce
Branch: refs/heads/trunk
Commit: afb3ecce139ab7c4037602ad143b5e6424dc48a2
Parents: e495238
Author: Nitay Joffe <ni...@apache.org>
Authored: Mon May 6 20:57:19 2013 -0400
Committer: Nitay Joffe <ni...@apache.org>
Committed: Mon May 6 21:00:45 2013 -0400
----------------------------------------------------------------------
CHANGELOG | 2 +
.../java/org/apache/giraph/conf/GiraphClasses.java | 29 ++++
.../apache/giraph/conf/GiraphConfiguration.java | 22 +++
.../org/apache/giraph/conf/GiraphConstants.java | 14 ++
.../conf/ImmutableClassesGiraphConfiguration.java | 40 +++++-
.../org/apache/giraph/graph/GraphTaskManager.java | 2 +
.../giraph/io/filters/DefaultEdgeInputFilter.java | 39 +++++
.../io/filters/DefaultVertexInputFilter.java | 41 +++++
.../apache/giraph/io/filters/EdgeInputFilter.java | 41 +++++
.../giraph/io/filters/VertexInputFilter.java | 42 +++++
.../org/apache/giraph/io/filters/package-info.java | 21 +++
.../org/apache/giraph/metrics/MetricNames.java | 10 ++
.../giraph/worker/EdgeInputSplitsCallable.java | 42 +++++-
.../apache/giraph/worker/InputSplitsCallable.java | 57 +++++++
.../giraph/worker/VertexInputSplitsCallable.java | 34 ++++-
.../java/org/apache/giraph/io/TestEdgeInput.java | 30 +---
.../java/org/apache/giraph/io/TestFilters.java | 119 +++++++++++++++
.../giraph/vertices/IntIntNullVertexDoNothing.java | 25 +++
.../apache/giraph/vertices/VertexCountEdges.java | 33 ++++
.../apache/giraph/vertices/VertexDoNothing.java | 33 ++++
20 files changed, 643 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index b8318b4..aa1ce1e 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.0.1 - unreleased
+ GIRAPH-560: Input filtering (nitay)
+
GIRAPH-621: Website Documentation: Basic Design Document (aching)
GIRAPH-658: Remove final modifier from SimpleHiveToEdge.initializeRecords
http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index 4a0e8f7..10e4975 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -27,7 +27,11 @@ import org.apache.giraph.graph.DefaultVertexValueFactory;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.graph.VertexValueFactory;
+import org.apache.giraph.io.filters.DefaultEdgeInputFilter;
+import org.apache.giraph.io.filters.DefaultVertexInputFilter;
+import org.apache.giraph.io.filters.EdgeInputFilter;
import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.filters.VertexInputFilter;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.master.DefaultMasterCompute;
@@ -108,6 +112,12 @@ public class GiraphClasses<I extends WritableComparable,
/** Partition class - cached for fast accesss */
protected Class<? extends Partition<I, V, E, M>> partitionClass;
+ /** Edge Input Filter class */
+ protected Class<? extends EdgeInputFilter<I, E>> edgeInputFilterClass;
+ /** Vertex Input Filter class */
+ protected Class<? extends VertexInputFilter<I, V, E, M>>
+ vertexInputFilterClass;
+
/**
* Empty constructor. Initialize with default classes or null.
*/
@@ -131,6 +141,10 @@ public class GiraphClasses<I extends WritableComparable,
masterComputeClass = DefaultMasterCompute.class;
partitionClass = (Class<? extends Partition<I, V, E, M>>) (Object)
SimplePartition.class;
+ edgeInputFilterClass = (Class<? extends EdgeInputFilter<I, E>>)
+ (Object) DefaultEdgeInputFilter.class;
+ vertexInputFilterClass = (Class<? extends VertexInputFilter<I, V, E, M>>)
+ (Object) DefaultVertexInputFilter.class;
}
/**
@@ -185,6 +199,11 @@ public class GiraphClasses<I extends WritableComparable,
masterComputeClass = MASTER_COMPUTE_CLASS.get(conf);
partitionClass = (Class<? extends Partition<I, V, E, M>>)
PARTITION_CLASS.get(conf);
+
+ edgeInputFilterClass = (Class<? extends EdgeInputFilter<I, E>>)
+ EDGE_INPUT_FILTER_CLASS.get(conf);
+ vertexInputFilterClass = (Class<? extends VertexInputFilter<I, V, E, M>>)
+ VERTEX_INPUT_FILTER_CLASS.get(conf);
}
/**
@@ -269,6 +288,16 @@ public class GiraphClasses<I extends WritableComparable,
return graphPartitionerFactoryClass;
}
+ public Class<? extends EdgeInputFilter<I, E>>
+ getEdgeInputFilterClass() {
+ return edgeInputFilterClass;
+ }
+
+ public Class<? extends VertexInputFilter<I, V, E, M>>
+ getVertexInputFilterClass() {
+ return vertexInputFilterClass;
+ }
+
/**
* Check if VertexInputFormat class is set
*
http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 8d74626..754fad9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -28,6 +28,8 @@ import org.apache.giraph.graph.VertexValueFactory;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.filters.EdgeInputFilter;
+import org.apache.giraph.io.filters.VertexInputFilter;
import org.apache.giraph.job.GiraphJobObserver;
import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.master.MasterObserver;
@@ -106,6 +108,26 @@ public class GiraphConfiguration extends Configuration
}
/**
+ * Set the edge input filter class
+ *
+ * @param edgeFilterClass class to use
+ */
+ public void setEdgeInputFilterClass(
+ Class<? extends EdgeInputFilter> edgeFilterClass) {
+ EDGE_INPUT_FILTER_CLASS.set(this, edgeFilterClass);
+ }
+
+ /**
+ * Set the vertex input filter class
+ *
+ * @param vertexFilterClass class to use
+ */
+ public void setVertexInputFilterClass(
+ Class<? extends VertexInputFilter> vertexFilterClass) {
+ VERTEX_INPUT_FILTER_CLASS.set(this, vertexFilterClass);
+ }
+
+ /**
* Get the vertex edges class
*
* @return vertex edges class
http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 54a40b7..bbf50e5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -30,6 +30,10 @@ import org.apache.giraph.graph.VertexValueFactory;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.filters.DefaultEdgeInputFilter;
+import org.apache.giraph.io.filters.DefaultVertexInputFilter;
+import org.apache.giraph.io.filters.EdgeInputFilter;
+import org.apache.giraph.io.filters.VertexInputFilter;
import org.apache.giraph.job.DefaultJobObserver;
import org.apache.giraph.job.GiraphJobObserver;
import org.apache.giraph.master.DefaultMasterCompute;
@@ -120,6 +124,16 @@ public interface GiraphConstants {
ClassConfOption.create("giraph.edgeInputFormatClass", null,
EdgeInputFormat.class);
+ /** EdgeInputFilter class */
+ ClassConfOption<EdgeInputFilter> EDGE_INPUT_FILTER_CLASS =
+ ClassConfOption.create("giraph.edgeInputFilterClass",
+ DefaultEdgeInputFilter.class, EdgeInputFilter.class);
+
+ /** VertexInputFilter class */
+ ClassConfOption<VertexInputFilter> VERTEX_INPUT_FILTER_CLASS =
+ ClassConfOption.create("giraph.vertexInputFilterClass",
+ DefaultVertexInputFilter.class, VertexInputFilter.class);
+
/** VertexOutputFormat class */
ClassConfOption<VertexOutputFormat> VERTEX_OUTPUT_FORMAT_CLASS =
ClassConfOption.create("giraph.vertexOutputFormatClass", null,
http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index f992b37..a9add4f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -31,6 +31,8 @@ import org.apache.giraph.graph.VertexValueFactory;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.filters.EdgeInputFilter;
+import org.apache.giraph.io.filters.VertexInputFilter;
import org.apache.giraph.io.internal.WrappedEdgeInputFormat;
import org.apache.giraph.io.internal.WrappedVertexInputFormat;
import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
@@ -93,7 +95,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
*/
public ImmutableClassesGiraphConfiguration(Configuration conf) {
super(conf);
- classes = new GiraphClasses(conf);
+ classes = new GiraphClasses<I, V, E, M>(conf);
useUnsafeSerialization = USE_UNSAFE_SERIALIZATION.get(this);
try {
vertexValueFactory = (VertexValueFactory<V>)
@@ -138,6 +140,42 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
+ * Get the vertex input filter class
+ *
+ * @return VertexInputFilter class
+ */
+ public Class<? extends EdgeInputFilter<I, E>>
+ getEdgeInputFilterClass() {
+ return classes.getEdgeInputFilterClass();
+ }
+
+ /**
+ * Get the edge input filter to use
+ * @return EdgeInputFilter
+ */
+ public EdgeInputFilter getEdgeInputFilter() {
+ return ReflectionUtils.newInstance(getEdgeInputFilterClass(), this);
+ }
+
+ /**
+ * Get the vertex input filter class
+ *
+ * @return VertexInputFilter class
+ */
+ public Class<? extends VertexInputFilter<I, V, E, M>>
+ getVertexInputFilterClass() {
+ return classes.getVertexInputFilterClass();
+ }
+
+ /**
+ * Get the vertex input filter to use
+ * @return VertexInputFilter
+ */
+ public VertexInputFilter getVertexInputFilter() {
+ return ReflectionUtils.newInstance(getVertexInputFilterClass(), this);
+ }
+
+ /**
* Get the user's subclassed
* {@link org.apache.giraph.partition.GraphPartitionerFactory}.
*
http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 97cf55d..5dbf977 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -42,6 +42,7 @@ import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.ProgressableUtils;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.worker.BspServiceWorker;
+import org.apache.giraph.worker.InputSplitsCallable;
import org.apache.giraph.worker.WorkerAggregatorUsage;
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerObserver;
@@ -337,6 +338,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
GiraphMetrics.get().addSuperstepResetObserver(this);
initJobMetrics();
MemoryUtils.initMetrics();
+ InputSplitsCallable.initMetrics();
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultEdgeInputFilter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultEdgeInputFilter.java b/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultEdgeInputFilter.java
new file mode 100644
index 0000000..ad52496
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultEdgeInputFilter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.filters;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Default edge filter that lets in all edges.
+ *
+ * @param <I> Vertex ID
+ * @param <E> Edge Value
+ */
+public class DefaultEdgeInputFilter<I extends WritableComparable,
+ E extends Writable>
+ extends DefaultImmutableClassesGiraphConfigurable<I, Writable, E, Writable>
+ implements EdgeInputFilter<I, E> {
+ @Override
+ public boolean dropEdge(I sourceId, Edge<I, E> edge) {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultVertexInputFilter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultVertexInputFilter.java b/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultVertexInputFilter.java
new file mode 100644
index 0000000..2976cbc
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultVertexInputFilter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.filters;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Default vertex filter that lets in all vertices.
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M> Message Value
+ */
+public class DefaultVertexInputFilter<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
+ implements VertexInputFilter<I, V, E, M> {
+ @Override
+ public boolean dropVertex(Vertex<I, V, E, M> vertex) {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/io/filters/EdgeInputFilter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/filters/EdgeInputFilter.java b/giraph-core/src/main/java/org/apache/giraph/io/filters/EdgeInputFilter.java
new file mode 100644
index 0000000..fa69932
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/filters/EdgeInputFilter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.filters;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Filters edges on input.
+ *
+ * @param <I> Vertex ID
+ * @param <E> Edge Value
+ */
+public interface EdgeInputFilter<I extends WritableComparable,
+ E extends Writable> {
+ /**
+ * Whether to drop this edge
+ *
+ * @param sourceId ID of source of edge
+ * @param edge to check
+ * @return true if we should drop the edge
+ */
+ boolean dropEdge(I sourceId, Edge<I, E> edge);
+}
+
http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/io/filters/VertexInputFilter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/filters/VertexInputFilter.java b/giraph-core/src/main/java/org/apache/giraph/io/filters/VertexInputFilter.java
new file mode 100644
index 0000000..d9af103
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/filters/VertexInputFilter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.filters;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Filters vertices on input.
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M> Message Value
+ */
+public interface VertexInputFilter<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> {
+ /**
+ * Whether to drop a vertex on input.
+ *
+ * @param vertex to check
+ * @return true if we should drop vertex
+ */
+ boolean dropVertex(Vertex<I, V, E, M> vertex);
+}
+
http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/io/filters/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/filters/package-info.java b/giraph-core/src/main/java/org/apache/giraph/io/filters/package-info.java
new file mode 100644
index 0000000..a8f901a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/filters/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Input/Output filters.
+ */
+package org.apache.giraph.io.filters;
http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java b/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java
index 52a3d15..cc237ac 100644
--- a/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java
@@ -78,4 +78,14 @@ public interface MetricNames {
/** PercentGauge of memory free */
String MEMORY_FREE_PERCENT = "memory-free-pct";
+
+ /** Total edges loaded */
+ String EDGES_FILTERED = "edges-filtered";
+ /** Percent of edges filtered out */
+ String EDGES_FILTERED_PCT = "edges-filtered-pct";
+
+ /** Total vertices filtered */
+ String VERTICES_FILTERED = "vertices-filtered";
+ /** Percent of vertices filtered out */
+ String VERTICES_FILTERED_PCT = "vertices-filtered-pct";
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 83fe5ea..351a114 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -24,6 +24,7 @@ import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.io.filters.EdgeInputFilter;
import org.apache.giraph.utils.LoggerUtils;
import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.zk.ZooKeeperExt;
@@ -34,6 +35,7 @@ import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
+import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Meter;
import java.io.IOException;
@@ -52,7 +54,9 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
extends InputSplitsCallable<I, V, E, M> {
/** How often to update metrics and print info */
- public static final int VERTICES_UPDATE_PERIOD = 1000000;
+ public static final int EDGES_UPDATE_PERIOD = 1000000;
+ /** How often to update filtered metrics */
+ public static final int EDGES_FILTERED_UPDATE_PERIOD = 10000;
/** Class logger */
private static final Logger LOG = Logger.getLogger(
@@ -62,9 +66,14 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
/** Input split max edges (-1 denotes all) */
private final long inputSplitMaxEdges;
+ /** Filter to use */
+ private final EdgeInputFilter<I, E> edgeInputFilter;
+
// Metrics
/** edges loaded meter across all readers */
private final Meter totalEdgesMeter;
+ /** edges filtered out by user */
+ private final Counter totalEdgesFiltered;
/**
* Constructor.
@@ -90,9 +99,11 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
this.edgeInputFormat = edgeInputFormat;
inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
+ edgeInputFilter = configuration.getEdgeInputFilter();
// Initialize Metrics
totalEdgesMeter = getTotalEdgesLoadedMeter();
+ totalEdgesFiltered = getTotalEdgesFilteredCounter();
}
@Override
@@ -121,7 +132,10 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
(ImmutableClassesGiraphConfiguration<I, Writable, E, Writable>)
configuration);
edgeReader.initialize(inputSplit, context);
+
long inputSplitEdgesLoaded = 0;
+ long inputSplitEdgesFiltered = 0;
+
while (edgeReader.nextEdge()) {
I sourceId = edgeReader.getCurrentSourceId();
Edge<I, E> readerEdge = edgeReader.getCurrentEdge();
@@ -141,14 +155,24 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
"without a value! - " + readerEdge);
}
- graphState.getWorkerClientRequestProcessor().sendEdgeRequest(
- sourceId, readerEdge);
- context.progress(); // do this before potential data transfer
++inputSplitEdgesLoaded;
- // Update status every VERTICES_UPDATE_PERIOD edges
- if (inputSplitEdgesLoaded % VERTICES_UPDATE_PERIOD == 0) {
- totalEdgesMeter.mark(VERTICES_UPDATE_PERIOD);
+ if (edgeInputFilter.dropEdge(sourceId, readerEdge)) {
+ ++inputSplitEdgesFiltered;
+ if (inputSplitEdgesFiltered % EDGES_FILTERED_UPDATE_PERIOD == 0) {
+ totalEdgesFiltered.inc(inputSplitEdgesFiltered);
+ inputSplitEdgesFiltered = 0;
+ }
+ continue;
+ }
+
+ graphState.getWorkerClientRequestProcessor().sendEdgeRequest(sourceId,
+ readerEdge);
+ context.progress(); // do this before potential data transfer
+
+ // Update status every EDGES_UPDATE_PERIOD edges
+ if (inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD == 0) {
+ totalEdgesMeter.mark(EDGES_UPDATE_PERIOD);
LoggerUtils.setStatusAndLog(context, LOG, Level.INFO,
"readEdgeInputSplit: Loaded " +
totalEdgesMeter.count() + " edges at " +
@@ -169,6 +193,10 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
}
}
edgeReader.close();
+
+ totalEdgesFiltered.inc(inputSplitEdgesFiltered);
+ totalEdgesMeter.mark(inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD);
+
return new VertexEdgeCount(0, inputSplitEdgesLoaded);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index f7a8340..a8298c5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -25,7 +25,9 @@ import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.GiraphInputFormat;
import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.GiraphMetricsRegistry;
import org.apache.giraph.metrics.MeterDesc;
+import org.apache.giraph.metrics.MetricNames;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
import org.apache.giraph.time.Times;
@@ -38,7 +40,9 @@ import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
+import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.util.PercentGauge;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
@@ -135,6 +139,16 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
}
/**
+ * Get Counter tracking edges filtered
+ *
+ * @return Counter tracking edges filtered
+ */
+ public static Counter getTotalEdgesFilteredCounter() {
+ return GiraphMetrics.get().perJobRequired()
+ .getCounter(MetricNames.EDGES_FILTERED);
+ }
+
+ /**
* Get Meter tracking number of vertices loaded.
*
* @return Meter for vertices loaded
@@ -145,6 +159,49 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
}
/**
+ * Get Counter tracking vertices filtered
+ *
+ * @return Counter tracking vertices filtered
+ */
+ public static Counter getTotalVerticesFilteredCounter() {
+ return GiraphMetrics.get().perJobRequired()
+ .getCounter(MetricNames.VERTICES_FILTERED);
+ }
+
+ /**
+ * Initialize metrics used by this class and its subclasses.
+ */
+ public static void initMetrics() {
+ GiraphMetricsRegistry metrics = GiraphMetrics.get().perJobRequired();
+
+ final Counter edgesFiltered = getTotalEdgesFilteredCounter();
+ final Meter edgesLoaded = getTotalEdgesLoadedMeter();
+
+ metrics.getGauge(MetricNames.EDGES_FILTERED_PCT, new PercentGauge() {
+ @Override protected double getNumerator() {
+ return edgesFiltered.count();
+ }
+
+ @Override protected double getDenominator() {
+ return edgesLoaded.count();
+ }
+ });
+
+ final Counter verticesFiltered = getTotalVerticesFilteredCounter();
+ final Meter verticesLoaded = getTotalVerticesLoadedMeter();
+
+ metrics.getGauge(MetricNames.VERTICES_FILTERED_PCT, new PercentGauge() {
+ @Override protected double getNumerator() {
+ return verticesFiltered.count();
+ }
+
+ @Override protected double getDenominator() {
+ return verticesLoaded.count();
+ }
+ });
+ }
+
+ /**
* Load vertices/edges from the given input split.
*
* @param inputSplit Input split to load
http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index d32ccaf..1c292ad 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -20,14 +20,15 @@ package org.apache.giraph.worker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.GiraphInputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.io.filters.VertexInputFilter;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.utils.LoggerUtils;
import org.apache.giraph.utils.MemoryUtils;
-import org.apache.giraph.graph.Vertex;
import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -36,6 +37,7 @@ import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
+import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Meter;
import java.io.IOException;
@@ -55,6 +57,9 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
extends InputSplitsCallable<I, V, E, M> {
/** How often to update metrics and print info */
public static final int VERTICES_UPDATE_PERIOD = 250000;
+ /** How often to update filtered out metrics */
+ public static final int VERTICES_FILTERED_UPDATE_PERIOD = 2500;
+
/** Class logger */
private static final Logger LOG =
Logger.getLogger(VertexInputSplitsCallable.class);
@@ -64,10 +69,14 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
private final long inputSplitMaxVertices;
/** Bsp service worker (only use thread-safe methods) */
private final BspServiceWorker<I, V, E, M> bspServiceWorker;
+ /** Filter to select which vertices to keep */
+ private final VertexInputFilter<I, V, E, M> vertexInputFilter;
// Metrics
/** number of vertices loaded meter across all readers */
private final Meter totalVerticesMeter;
+ /** number of vertices filtered out */
+ private final Counter totalVerticesFilteredCounter;
/** number of edges loaded meter across all readers */
private final Meter totalEdgesMeter;
@@ -96,9 +105,11 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
this.bspServiceWorker = bspServiceWorker;
+ vertexInputFilter = configuration.getVertexInputFilter();
// Initialize Metrics
totalVerticesMeter = getTotalVerticesLoadedMeter();
+ totalVerticesFilteredCounter = getTotalVerticesFilteredCounter();
totalEdgesMeter = getTotalEdgesLoadedMeter();
}
@@ -127,9 +138,13 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
vertexReader.setConf(
(ImmutableClassesGiraphConfiguration<I, V, E, Writable>) configuration);
vertexReader.initialize(inputSplit, context);
+
long inputSplitVerticesLoaded = 0;
+ long inputSplitVerticesFiltered = 0;
+
long edgesSinceLastUpdate = 0;
long inputSplitEdgesLoaded = 0;
+
while (vertexReader.nextVertex()) {
Vertex<I, V, E, M> readerVertex =
(Vertex<I, V, E, M>) vertexReader.getCurrentVertex();
@@ -144,12 +159,22 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
readerVertex.setConf(configuration);
readerVertex.setGraphState(graphState);
+ ++inputSplitVerticesLoaded;
+
+ if (vertexInputFilter.dropVertex(readerVertex)) {
+ ++inputSplitVerticesFiltered;
+ if (inputSplitVerticesFiltered % VERTICES_FILTERED_UPDATE_PERIOD == 0) {
+ totalVerticesFilteredCounter.inc(inputSplitVerticesFiltered);
+ inputSplitVerticesFiltered = 0;
+ }
+ continue;
+ }
+
PartitionOwner partitionOwner =
bspServiceWorker.getVertexPartitionOwner(readerVertex.getId());
graphState.getWorkerClientRequestProcessor().sendVertexRequest(
partitionOwner, readerVertex);
context.progress(); // do this before potential data transfer
- ++inputSplitVerticesLoaded;
edgesSinceLastUpdate += readerVertex.getNumEdges();
// Update status every VERTICES_UPDATE_PERIOD vertices
@@ -181,6 +206,11 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
break;
}
}
+
+ totalVerticesMeter.mark(inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD);
+ totalEdgesMeter.mark(edgesSinceLastUpdate);
+ totalVerticesFilteredCounter.inc(inputSplitVerticesFiltered);
+
vertexReader.close();
return new VertexEdgeCount(inputSplitVerticesLoaded,
inputSplitEdgesLoaded + edgesSinceLastUpdate);
http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java b/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
index 07d4cc8..cb1a8da 100644
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
@@ -23,13 +23,14 @@ import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.edge.Edge;
-import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexValueFactory;
import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
import org.apache.giraph.io.formats.IntIntTextVertexValueInputFormat;
import org.apache.giraph.io.formats.IntNullReverseTextEdgeInputFormat;
import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat;
import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.giraph.vertices.IntIntNullVertexDoNothing;
+import org.apache.giraph.vertices.VertexCountEdges;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.junit.Test;
@@ -64,7 +65,7 @@ public class TestEdgeInput extends BspCase {
};
GiraphConfiguration conf = new GiraphConfiguration();
- conf.setVertexClass(TestVertexWithNumEdges.class);
+ conf.setVertexClass(VertexCountEdges.class);
conf.setOutEdgesClass(ByteArrayEdges.class);
conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
@@ -93,7 +94,7 @@ public class TestEdgeInput extends BspCase {
};
GiraphConfiguration conf = new GiraphConfiguration();
- conf.setVertexClass(TestVertexWithNumEdges.class);
+ conf.setVertexClass(VertexCountEdges.class);
conf.setOutEdgesClass(ByteArrayEdges.class);
conf.setEdgeInputFormatClass(IntNullReverseTextEdgeInputFormat.class);
conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
@@ -129,7 +130,7 @@ public class TestEdgeInput extends BspCase {
};
GiraphConfiguration conf = new GiraphConfiguration();
- conf.setVertexClass(TestVertexDoNothing.class);
+ conf.setVertexClass(IntIntNullVertexDoNothing.class);
conf.setOutEdgesClass(ByteArrayEdges.class);
conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
@@ -160,7 +161,7 @@ public class TestEdgeInput extends BspCase {
assertEquals(3, (int) values.get(5));
conf = new GiraphConfiguration();
- conf.setVertexClass(TestVertexWithNumEdges.class);
+ conf.setVertexClass(VertexCountEdges.class);
conf.setOutEdgesClass(ByteArrayEdges.class);
conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
@@ -209,16 +210,7 @@ public class TestEdgeInput extends BspCase {
assertEquals(0, (int) values.get(4));
}
- public static class TestVertexWithNumEdges extends Vertex<IntWritable,
- IntWritable, NullWritable, NullWritable> {
- @Override
- public void compute(Iterable<NullWritable> messages) throws IOException {
- setValue(new IntWritable(getNumEdges()));
- voteToHalt();
- }
- }
-
- public static class TestVertexCheckEdgesType extends TestVertexWithNumEdges {
+ public static class TestVertexCheckEdgesType extends VertexCountEdges {
@Override
public void compute(Iterable<NullWritable> messages) throws IOException {
assertFalse(getEdges() instanceof TestOutEdgesFilterEven);
@@ -227,14 +219,6 @@ public class TestEdgeInput extends BspCase {
}
}
- public static class TestVertexDoNothing extends Vertex<IntWritable,
- IntWritable, NullWritable, NullWritable> {
- @Override
- public void compute(Iterable<NullWritable> messages) throws IOException {
- voteToHalt();
- }
- }
-
public static class TestVertexValueFactory
implements VertexValueFactory<IntWritable> {
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/test/java/org/apache/giraph/io/TestFilters.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestFilters.java b/giraph-core/src/test/java/org/apache/giraph/io/TestFilters.java
new file mode 100644
index 0000000..83a366d
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestFilters.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io;
+
+import org.apache.giraph.BspCase;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.filters.EdgeInputFilter;
+import org.apache.giraph.io.filters.VertexInputFilter;
+import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.io.formats.IntIntTextVertexValueInputFormat;
+import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.giraph.vertices.IntIntNullVertexDoNothing;
+import org.apache.giraph.vertices.VertexCountEdges;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestFilters extends BspCase {
+ public TestFilters() {
+ super(TestFilters.class.getName());
+ }
+
+ public static class EdgeFilter implements EdgeInputFilter<IntWritable, NullWritable> {
+ @Override public boolean dropEdge(IntWritable sourceId, Edge<IntWritable, NullWritable> edge) {
+ return sourceId.get() == 2;
+ }
+ }
+
+ @Test
+ public void testEdgeFilter() throws Exception {
+ String[] edges = new String[] {
+ "1 2",
+ "2 3",
+ "2 4",
+ "4 1"
+ };
+
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(VertexCountEdges.class);
+ conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
+ conf.setEdgeInputFilterClass(EdgeFilter.class);
+ conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+ Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
+
+ Map<Integer, Integer> values = parseResults(results);
+
+ assertEquals(2, values.size());
+ assertEquals(1, (int) values.get(1));
+ assertEquals(1, (int) values.get(4));
+ }
+
+ public static class VertexFilter implements VertexInputFilter<IntWritable,
+ NullWritable, NullWritable, NullWritable> {
+ @Override
+ public boolean dropVertex(Vertex<IntWritable, NullWritable, NullWritable,
+ NullWritable> vertex) {
+ int id = vertex.getId().get();
+ return id == 2 || id == 3;
+ }
+ }
+
+ @Test
+ public void testVertexFilter() throws Exception {
+ String[] vertices = new String[] {
+ "1 1",
+ "2 2",
+ "3 3",
+ "4 4"
+ };
+
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setVertexClass(IntIntNullVertexDoNothing.class);
+ conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
+ conf.setVertexInputFilterClass(VertexFilter.class);
+ conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+ Iterable<String> results = InternalVertexRunner.run(conf, vertices);
+
+ Map<Integer, Integer> values = parseResults(results);
+
+ assertEquals(2, values.size());
+ assertEquals(1, (int) values.get(1));
+ assertEquals(4, (int) values.get(4));
+ }
+
+ private static Map<Integer, Integer> parseResults(Iterable<String> results) {
+ Map<Integer, Integer> values = Maps.newHashMap();
+ for (String line : results) {
+ String[] tokens = line.split("\\s+");
+ int id = Integer.valueOf(tokens[0]);
+ int value = Integer.valueOf(tokens[1]);
+ values.put(id, value);
+ }
+ return values;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/test/java/org/apache/giraph/vertices/IntIntNullVertexDoNothing.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/vertices/IntIntNullVertexDoNothing.java b/giraph-core/src/test/java/org/apache/giraph/vertices/IntIntNullVertexDoNothing.java
new file mode 100644
index 0000000..c98d580
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/vertices/IntIntNullVertexDoNothing.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.vertices;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+public class IntIntNullVertexDoNothing extends VertexDoNothing<IntWritable,
+ IntWritable, NullWritable, NullWritable> {
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/test/java/org/apache/giraph/vertices/VertexCountEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/vertices/VertexCountEdges.java b/giraph-core/src/test/java/org/apache/giraph/vertices/VertexCountEdges.java
new file mode 100644
index 0000000..9060bc7
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/vertices/VertexCountEdges.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.vertices;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import java.io.IOException;
+
+public class VertexCountEdges extends Vertex<IntWritable, IntWritable,
+ NullWritable, NullWritable> {
+ @Override
+ public void compute(Iterable<NullWritable> messages) throws IOException {
+ setValue(new IntWritable(getNumEdges()));
+ voteToHalt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/test/java/org/apache/giraph/vertices/VertexDoNothing.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/vertices/VertexDoNothing.java b/giraph-core/src/test/java/org/apache/giraph/vertices/VertexDoNothing.java
new file mode 100644
index 0000000..fac3fce
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/vertices/VertexDoNothing.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.vertices;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+public class VertexDoNothing<I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable> extends Vertex<I, V, E, M> {
+ @Override
+ public void compute(Iterable<M> messages) throws IOException {
+ voteToHalt();
+ }
+}
+