You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/05/07 03:00:53 UTC

git commit: updated refs/heads/trunk to afb3ecc

Updated Branches:
  refs/heads/trunk e495238bb -> afb3ecce1


GIRAPH-560: Input filtering (nitay)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/afb3ecce
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/afb3ecce
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/afb3ecce

Branch: refs/heads/trunk
Commit: afb3ecce139ab7c4037602ad143b5e6424dc48a2
Parents: e495238
Author: Nitay Joffe <ni...@apache.org>
Authored: Mon May 6 20:57:19 2013 -0400
Committer: Nitay Joffe <ni...@apache.org>
Committed: Mon May 6 21:00:45 2013 -0400

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../java/org/apache/giraph/conf/GiraphClasses.java |   29 ++++
 .../apache/giraph/conf/GiraphConfiguration.java    |   22 +++
 .../org/apache/giraph/conf/GiraphConstants.java    |   14 ++
 .../conf/ImmutableClassesGiraphConfiguration.java  |   40 +++++-
 .../org/apache/giraph/graph/GraphTaskManager.java  |    2 +
 .../giraph/io/filters/DefaultEdgeInputFilter.java  |   39 +++++
 .../io/filters/DefaultVertexInputFilter.java       |   41 +++++
 .../apache/giraph/io/filters/EdgeInputFilter.java  |   41 +++++
 .../giraph/io/filters/VertexInputFilter.java       |   42 +++++
 .../org/apache/giraph/io/filters/package-info.java |   21 +++
 .../org/apache/giraph/metrics/MetricNames.java     |   10 ++
 .../giraph/worker/EdgeInputSplitsCallable.java     |   42 +++++-
 .../apache/giraph/worker/InputSplitsCallable.java  |   57 +++++++
 .../giraph/worker/VertexInputSplitsCallable.java   |   34 ++++-
 .../java/org/apache/giraph/io/TestEdgeInput.java   |   30 +---
 .../java/org/apache/giraph/io/TestFilters.java     |  119 +++++++++++++++
 .../giraph/vertices/IntIntNullVertexDoNothing.java |   25 +++
 .../apache/giraph/vertices/VertexCountEdges.java   |   33 ++++
 .../apache/giraph/vertices/VertexDoNothing.java    |   33 ++++
 20 files changed, 643 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index b8318b4..aa1ce1e 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.0.1 - unreleased
+  GIRAPH-560: Input filtering (nitay)
+
   GIRAPH-621: Website Documentation: Basic Design Document (aching)
 
   GIRAPH-658: Remove final modifier from SimpleHiveToEdge.initializeRecords

http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index 4a0e8f7..10e4975 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -27,7 +27,11 @@ import org.apache.giraph.graph.DefaultVertexValueFactory;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.graph.VertexValueFactory;
+import org.apache.giraph.io.filters.DefaultEdgeInputFilter;
+import org.apache.giraph.io.filters.DefaultVertexInputFilter;
+import org.apache.giraph.io.filters.EdgeInputFilter;
 import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.filters.VertexInputFilter;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.master.DefaultMasterCompute;
@@ -108,6 +112,12 @@ public class GiraphClasses<I extends WritableComparable,
   /** Partition class - cached for fast accesss */
   protected Class<? extends Partition<I, V, E, M>> partitionClass;
 
+  /** Edge Input Filter class */
+  protected Class<? extends EdgeInputFilter<I, E>> edgeInputFilterClass;
+  /** Vertex Input Filter class */
+  protected Class<? extends VertexInputFilter<I, V, E, M>>
+  vertexInputFilterClass;
+
   /**
    * Empty constructor. Initialize with default classes or null.
    */
@@ -131,6 +141,10 @@ public class GiraphClasses<I extends WritableComparable,
     masterComputeClass = DefaultMasterCompute.class;
     partitionClass = (Class<? extends Partition<I, V, E, M>>) (Object)
         SimplePartition.class;
+    edgeInputFilterClass = (Class<? extends EdgeInputFilter<I, E>>)
+        (Object) DefaultEdgeInputFilter.class;
+    vertexInputFilterClass = (Class<? extends VertexInputFilter<I, V, E, M>>)
+        (Object) DefaultVertexInputFilter.class;
   }
 
   /**
@@ -185,6 +199,11 @@ public class GiraphClasses<I extends WritableComparable,
     masterComputeClass =  MASTER_COMPUTE_CLASS.get(conf);
     partitionClass = (Class<? extends Partition<I, V, E, M>>)
         PARTITION_CLASS.get(conf);
+
+    edgeInputFilterClass = (Class<? extends EdgeInputFilter<I, E>>)
+        EDGE_INPUT_FILTER_CLASS.get(conf);
+    vertexInputFilterClass = (Class<? extends VertexInputFilter<I, V, E, M>>)
+        VERTEX_INPUT_FILTER_CLASS.get(conf);
   }
 
   /**
@@ -269,6 +288,16 @@ public class GiraphClasses<I extends WritableComparable,
     return graphPartitionerFactoryClass;
   }
 
+  public Class<? extends EdgeInputFilter<I, E>>
+  getEdgeInputFilterClass() {
+    return edgeInputFilterClass;
+  }
+
+  public Class<? extends VertexInputFilter<I, V, E, M>>
+  getVertexInputFilterClass() {
+    return vertexInputFilterClass;
+  }
+
   /**
    * Check if VertexInputFormat class is set
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 8d74626..754fad9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -28,6 +28,8 @@ import org.apache.giraph.graph.VertexValueFactory;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.filters.EdgeInputFilter;
+import org.apache.giraph.io.filters.VertexInputFilter;
 import org.apache.giraph.job.GiraphJobObserver;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterObserver;
@@ -106,6 +108,26 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Set the edge input filter class
+   *
+   * @param edgeFilterClass class to use
+   */
+  public void setEdgeInputFilterClass(
+      Class<? extends EdgeInputFilter> edgeFilterClass) {
+    EDGE_INPUT_FILTER_CLASS.set(this, edgeFilterClass);
+  }
+
+  /**
+   * Set the vertex input filter class
+   *
+   * @param vertexFilterClass class to use
+   */
+  public void setVertexInputFilterClass(
+      Class<? extends VertexInputFilter> vertexFilterClass) {
+    VERTEX_INPUT_FILTER_CLASS.set(this, vertexFilterClass);
+  }
+
+  /**
    * Get the vertex edges class
    *
    * @return vertex edges class

http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 54a40b7..bbf50e5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -30,6 +30,10 @@ import org.apache.giraph.graph.VertexValueFactory;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.filters.DefaultEdgeInputFilter;
+import org.apache.giraph.io.filters.DefaultVertexInputFilter;
+import org.apache.giraph.io.filters.EdgeInputFilter;
+import org.apache.giraph.io.filters.VertexInputFilter;
 import org.apache.giraph.job.DefaultJobObserver;
 import org.apache.giraph.job.GiraphJobObserver;
 import org.apache.giraph.master.DefaultMasterCompute;
@@ -120,6 +124,16 @@ public interface GiraphConstants {
       ClassConfOption.create("giraph.edgeInputFormatClass", null,
           EdgeInputFormat.class);
 
+  /** EdgeInputFilter class */
+  ClassConfOption<EdgeInputFilter> EDGE_INPUT_FILTER_CLASS =
+      ClassConfOption.create("giraph.edgeInputFilterClass",
+          DefaultEdgeInputFilter.class, EdgeInputFilter.class);
+
+  /** VertexInputFilter class */
+  ClassConfOption<VertexInputFilter> VERTEX_INPUT_FILTER_CLASS =
+      ClassConfOption.create("giraph.vertexInputFilterClass",
+          DefaultVertexInputFilter.class, VertexInputFilter.class);
+
   /** VertexOutputFormat class */
   ClassConfOption<VertexOutputFormat> VERTEX_OUTPUT_FORMAT_CLASS =
       ClassConfOption.create("giraph.vertexOutputFormatClass", null,

http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index f992b37..a9add4f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -31,6 +31,8 @@ import org.apache.giraph.graph.VertexValueFactory;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.filters.EdgeInputFilter;
+import org.apache.giraph.io.filters.VertexInputFilter;
 import org.apache.giraph.io.internal.WrappedEdgeInputFormat;
 import org.apache.giraph.io.internal.WrappedVertexInputFormat;
 import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
@@ -93,7 +95,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
    */
   public ImmutableClassesGiraphConfiguration(Configuration conf) {
     super(conf);
-    classes = new GiraphClasses(conf);
+    classes = new GiraphClasses<I, V, E, M>(conf);
     useUnsafeSerialization = USE_UNSAFE_SERIALIZATION.get(this);
     try {
       vertexValueFactory = (VertexValueFactory<V>)
@@ -138,6 +140,42 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Get the vertex input filter class
+   *
+   * @return VertexInputFilter class
+   */
+  public Class<? extends EdgeInputFilter<I, E>>
+  getEdgeInputFilterClass() {
+    return classes.getEdgeInputFilterClass();
+  }
+
+  /**
+   * Get the edge input filter to use
+   * @return EdgeInputFilter
+   */
+  public EdgeInputFilter getEdgeInputFilter() {
+    return ReflectionUtils.newInstance(getEdgeInputFilterClass(), this);
+  }
+
+  /**
+   * Get the vertex input filter class
+   *
+   * @return VertexInputFilter class
+   */
+  public Class<? extends VertexInputFilter<I, V, E, M>>
+  getVertexInputFilterClass() {
+    return classes.getVertexInputFilterClass();
+  }
+
+  /**
+   * Get the vertex input filter to use
+   * @return VertexInputFilter
+   */
+  public VertexInputFilter getVertexInputFilter() {
+    return ReflectionUtils.newInstance(getVertexInputFilterClass(), this);
+  }
+
+  /**
    * Get the user's subclassed
    * {@link org.apache.giraph.partition.GraphPartitionerFactory}.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 97cf55d..5dbf977 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -42,6 +42,7 @@ import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.worker.BspServiceWorker;
+import org.apache.giraph.worker.InputSplitsCallable;
 import org.apache.giraph.worker.WorkerAggregatorUsage;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.worker.WorkerObserver;
@@ -337,6 +338,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
     GiraphMetrics.get().addSuperstepResetObserver(this);
     initJobMetrics();
     MemoryUtils.initMetrics();
+    InputSplitsCallable.initMetrics();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultEdgeInputFilter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultEdgeInputFilter.java b/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultEdgeInputFilter.java
new file mode 100644
index 0000000..ad52496
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultEdgeInputFilter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.filters;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Default edge filter that lets in all edges.
+ *
+ * @param <I> Vertex ID
+ * @param <E> Edge Value
+ */
+public class DefaultEdgeInputFilter<I extends WritableComparable,
+    E extends Writable>
+    extends DefaultImmutableClassesGiraphConfigurable<I, Writable, E, Writable>
+    implements EdgeInputFilter<I, E> {
+  @Override
+  public boolean dropEdge(I sourceId, Edge<I, E> edge) {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultVertexInputFilter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultVertexInputFilter.java b/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultVertexInputFilter.java
new file mode 100644
index 0000000..2976cbc
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultVertexInputFilter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.filters;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Default vertex filter that lets in all vertices.
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M> Message Value
+ */
+public class DefaultVertexInputFilter<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
+    implements VertexInputFilter<I, V, E, M> {
+  @Override
+  public boolean dropVertex(Vertex<I, V, E, M> vertex) {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/io/filters/EdgeInputFilter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/filters/EdgeInputFilter.java b/giraph-core/src/main/java/org/apache/giraph/io/filters/EdgeInputFilter.java
new file mode 100644
index 0000000..fa69932
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/filters/EdgeInputFilter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.filters;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Filters edges on input.
+ *
+ * @param <I> Vertex ID
+ * @param <E> Edge Value
+ */
+public interface EdgeInputFilter<I extends WritableComparable,
+    E extends Writable> {
+  /**
+   * Whether to drop this edge
+   *
+   * @param sourceId ID of source of edge
+   * @param edge to check
+   * @return true if we should drop the edge
+   */
+  boolean dropEdge(I sourceId, Edge<I, E> edge);
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/io/filters/VertexInputFilter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/filters/VertexInputFilter.java b/giraph-core/src/main/java/org/apache/giraph/io/filters/VertexInputFilter.java
new file mode 100644
index 0000000..d9af103
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/filters/VertexInputFilter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.filters;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Filters vertices on input.
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M> Message Value
+ */
+public interface VertexInputFilter<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> {
+  /**
+   * Whether to drop a vertex on input.
+   *
+   * @param vertex to check
+   * @return true if we should drop vertex
+   */
+  boolean dropVertex(Vertex<I, V, E, M> vertex);
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/io/filters/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/filters/package-info.java b/giraph-core/src/main/java/org/apache/giraph/io/filters/package-info.java
new file mode 100644
index 0000000..a8f901a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/filters/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Input/Output filters.
+ */
+package org.apache.giraph.io.filters;

http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java b/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java
index 52a3d15..cc237ac 100644
--- a/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java
@@ -78,4 +78,14 @@ public interface MetricNames {
 
   /** PercentGauge of memory free */
   String MEMORY_FREE_PERCENT = "memory-free-pct";
+
+  /** Total edges loaded */
+  String EDGES_FILTERED = "edges-filtered";
+  /** Percent of edges filtered out */
+  String EDGES_FILTERED_PCT = "edges-filtered-pct";
+
+  /** Total vertices filtered */
+  String VERTICES_FILTERED = "vertices-filtered";
+  /** Percent of vertices filtered out */
+  String VERTICES_FILTERED_PCT = "vertices-filtered-pct";
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 83fe5ea..351a114 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -24,6 +24,7 @@ import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.io.filters.EdgeInputFilter;
 import org.apache.giraph.utils.LoggerUtils;
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.zk.ZooKeeperExt;
@@ -34,6 +35,7 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
+import com.yammer.metrics.core.Counter;
 import com.yammer.metrics.core.Meter;
 
 import java.io.IOException;
@@ -52,7 +54,9 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     extends InputSplitsCallable<I, V, E, M> {
   /** How often to update metrics and print info */
-  public static final int VERTICES_UPDATE_PERIOD = 1000000;
+  public static final int EDGES_UPDATE_PERIOD = 1000000;
+  /** How often to update filtered metrics */
+  public static final int EDGES_FILTERED_UPDATE_PERIOD = 10000;
 
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(
@@ -62,9 +66,14 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
   /** Input split max edges (-1 denotes all) */
   private final long inputSplitMaxEdges;
 
+  /** Filter to use */
+  private final EdgeInputFilter<I, E> edgeInputFilter;
+
   // Metrics
   /** edges loaded meter across all readers */
   private final Meter totalEdgesMeter;
+  /** edges filtered out by user */
+  private final Counter totalEdgesFiltered;
 
   /**
    * Constructor.
@@ -90,9 +99,11 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
     this.edgeInputFormat = edgeInputFormat;
 
     inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
+    edgeInputFilter = configuration.getEdgeInputFilter();
 
     // Initialize Metrics
     totalEdgesMeter = getTotalEdgesLoadedMeter();
+    totalEdgesFiltered = getTotalEdgesFilteredCounter();
   }
 
   @Override
@@ -121,7 +132,10 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
         (ImmutableClassesGiraphConfiguration<I, Writable, E, Writable>)
             configuration);
     edgeReader.initialize(inputSplit, context);
+
     long inputSplitEdgesLoaded = 0;
+    long inputSplitEdgesFiltered = 0;
+
     while (edgeReader.nextEdge()) {
       I sourceId = edgeReader.getCurrentSourceId();
       Edge<I, E> readerEdge = edgeReader.getCurrentEdge();
@@ -141,14 +155,24 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
                 "without a value!  - " + readerEdge);
       }
 
-      graphState.getWorkerClientRequestProcessor().sendEdgeRequest(
-          sourceId, readerEdge);
-      context.progress(); // do this before potential data transfer
       ++inputSplitEdgesLoaded;
 
-      // Update status every VERTICES_UPDATE_PERIOD edges
-      if (inputSplitEdgesLoaded % VERTICES_UPDATE_PERIOD == 0) {
-        totalEdgesMeter.mark(VERTICES_UPDATE_PERIOD);
+      if (edgeInputFilter.dropEdge(sourceId, readerEdge)) {
+        ++inputSplitEdgesFiltered;
+        if (inputSplitEdgesFiltered % EDGES_FILTERED_UPDATE_PERIOD == 0) {
+          totalEdgesFiltered.inc(inputSplitEdgesFiltered);
+          inputSplitEdgesFiltered = 0;
+        }
+        continue;
+      }
+
+      graphState.getWorkerClientRequestProcessor().sendEdgeRequest(sourceId,
+          readerEdge);
+      context.progress(); // do this before potential data transfer
+
+      // Update status every EDGES_UPDATE_PERIOD edges
+      if (inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD == 0) {
+        totalEdgesMeter.mark(EDGES_UPDATE_PERIOD);
         LoggerUtils.setStatusAndLog(context, LOG, Level.INFO,
             "readEdgeInputSplit: Loaded " +
                 totalEdgesMeter.count() + " edges at " +
@@ -169,6 +193,10 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
       }
     }
     edgeReader.close();
+
+    totalEdgesFiltered.inc(inputSplitEdgesFiltered);
+    totalEdgesMeter.mark(inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD);
+
     return new VertexEdgeCount(0, inputSplitEdgesLoaded);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index f7a8340..a8298c5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -25,7 +25,9 @@ import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.GiraphInputFormat;
 import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.GiraphMetricsRegistry;
 import org.apache.giraph.metrics.MeterDesc;
+import org.apache.giraph.metrics.MetricNames;
 import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
 import org.apache.giraph.time.Times;
@@ -38,7 +40,9 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
 
+import com.yammer.metrics.core.Counter;
 import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.util.PercentGauge;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
@@ -135,6 +139,16 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
   }
 
   /**
+   * Get Counter tracking edges filtered
+   *
+   * @return Counter tracking edges filtered
+   */
+  public static Counter getTotalEdgesFilteredCounter() {
+    return GiraphMetrics.get().perJobRequired()
+        .getCounter(MetricNames.EDGES_FILTERED);
+  }
+
+  /**
    * Get Meter tracking number of vertices loaded.
    *
    * @return Meter for vertices loaded
@@ -145,6 +159,49 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
   }
 
   /**
+   * Get Counter tracking vertices filtered
+   *
+   * @return Counter tracking vertices filtered
+   */
+  public static Counter getTotalVerticesFilteredCounter() {
+    return GiraphMetrics.get().perJobRequired()
+        .getCounter(MetricNames.VERTICES_FILTERED);
+  }
+
+  /**
+   * Initialize metrics used by this class and its subclasses.
+   */
+  public static void initMetrics() {
+    GiraphMetricsRegistry metrics = GiraphMetrics.get().perJobRequired();
+
+    final Counter edgesFiltered = getTotalEdgesFilteredCounter();
+    final Meter edgesLoaded = getTotalEdgesLoadedMeter();
+
+    metrics.getGauge(MetricNames.EDGES_FILTERED_PCT, new PercentGauge() {
+      @Override protected double getNumerator() {
+        return edgesFiltered.count();
+      }
+
+      @Override protected double getDenominator() {
+        return edgesLoaded.count();
+      }
+    });
+
+    final Counter verticesFiltered = getTotalVerticesFilteredCounter();
+    final Meter verticesLoaded = getTotalVerticesLoadedMeter();
+
+    metrics.getGauge(MetricNames.VERTICES_FILTERED_PCT, new PercentGauge() {
+      @Override protected double getNumerator() {
+        return verticesFiltered.count();
+      }
+
+      @Override protected double getDenominator() {
+        return verticesLoaded.count();
+      }
+    });
+  }
+
+  /**
    * Load vertices/edges from the given input split.
    *
    * @param inputSplit Input split to load

http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index d32ccaf..1c292ad 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -20,14 +20,15 @@ package org.apache.giraph.worker;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.GiraphInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.io.filters.VertexInputFilter;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.utils.LoggerUtils;
 import org.apache.giraph.utils.MemoryUtils;
-import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -36,6 +37,7 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
+import com.yammer.metrics.core.Counter;
 import com.yammer.metrics.core.Meter;
 
 import java.io.IOException;
@@ -55,6 +57,9 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
     extends InputSplitsCallable<I, V, E, M> {
   /** How often to update metrics and print info */
   public static final int VERTICES_UPDATE_PERIOD = 250000;
+  /** How often to update filtered out metrics */
+  public static final int VERTICES_FILTERED_UPDATE_PERIOD = 2500;
+
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(VertexInputSplitsCallable.class);
@@ -64,10 +69,14 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
   private final long inputSplitMaxVertices;
   /** Bsp service worker (only use thread-safe methods) */
   private final BspServiceWorker<I, V, E, M> bspServiceWorker;
+  /** Filter to select which vertices to keep */
+  private final VertexInputFilter<I, V, E, M> vertexInputFilter;
 
   // Metrics
   /** number of vertices loaded meter across all readers */
   private final Meter totalVerticesMeter;
+  /** number of vertices filtered out */
+  private final Counter totalVerticesFilteredCounter;
   /** number of edges loaded meter across all readers */
   private final Meter totalEdgesMeter;
 
@@ -96,9 +105,11 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
 
     inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
     this.bspServiceWorker = bspServiceWorker;
+    vertexInputFilter = configuration.getVertexInputFilter();
 
     // Initialize Metrics
     totalVerticesMeter = getTotalVerticesLoadedMeter();
+    totalVerticesFilteredCounter = getTotalVerticesFilteredCounter();
     totalEdgesMeter = getTotalEdgesLoadedMeter();
   }
 
@@ -127,9 +138,13 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
     vertexReader.setConf(
         (ImmutableClassesGiraphConfiguration<I, V, E, Writable>) configuration);
     vertexReader.initialize(inputSplit, context);
+
     long inputSplitVerticesLoaded = 0;
+    long inputSplitVerticesFiltered = 0;
+
     long edgesSinceLastUpdate = 0;
     long inputSplitEdgesLoaded = 0;
+
     while (vertexReader.nextVertex()) {
       Vertex<I, V, E, M> readerVertex =
           (Vertex<I, V, E, M>) vertexReader.getCurrentVertex();
@@ -144,12 +159,22 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
       readerVertex.setConf(configuration);
       readerVertex.setGraphState(graphState);
 
+      ++inputSplitVerticesLoaded;
+
+      if (vertexInputFilter.dropVertex(readerVertex)) {
+        ++inputSplitVerticesFiltered;
+        if (inputSplitVerticesFiltered % VERTICES_FILTERED_UPDATE_PERIOD == 0) {
+          totalVerticesFilteredCounter.inc(inputSplitVerticesFiltered);
+          inputSplitVerticesFiltered = 0;
+        }
+        continue;
+      }
+
       PartitionOwner partitionOwner =
           bspServiceWorker.getVertexPartitionOwner(readerVertex.getId());
       graphState.getWorkerClientRequestProcessor().sendVertexRequest(
           partitionOwner, readerVertex);
       context.progress(); // do this before potential data transfer
-      ++inputSplitVerticesLoaded;
       edgesSinceLastUpdate += readerVertex.getNumEdges();
 
       // Update status every VERTICES_UPDATE_PERIOD vertices
@@ -181,6 +206,11 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
         break;
       }
     }
+
+    totalVerticesMeter.mark(inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD);
+    totalEdgesMeter.mark(edgesSinceLastUpdate);
+    totalVerticesFilteredCounter.inc(inputSplitVerticesFiltered);
+
     vertexReader.close();
     return new VertexEdgeCount(inputSplitVerticesLoaded,
         inputSplitEdgesLoaded + edgesSinceLastUpdate);

http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java b/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
index 07d4cc8..cb1a8da 100644
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
@@ -23,13 +23,14 @@ import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.edge.Edge;
-import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexValueFactory;
 import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
 import org.apache.giraph.io.formats.IntIntTextVertexValueInputFormat;
 import org.apache.giraph.io.formats.IntNullReverseTextEdgeInputFormat;
 import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat;
 import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.giraph.vertices.IntIntNullVertexDoNothing;
+import org.apache.giraph.vertices.VertexCountEdges;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.junit.Test;
@@ -64,7 +65,7 @@ public class TestEdgeInput extends BspCase {
     };
 
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setVertexClass(TestVertexWithNumEdges.class);
+    conf.setVertexClass(VertexCountEdges.class);
     conf.setOutEdgesClass(ByteArrayEdges.class);
     conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
     conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
@@ -93,7 +94,7 @@ public class TestEdgeInput extends BspCase {
     };
 
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setVertexClass(TestVertexWithNumEdges.class);
+    conf.setVertexClass(VertexCountEdges.class);
     conf.setOutEdgesClass(ByteArrayEdges.class);
     conf.setEdgeInputFormatClass(IntNullReverseTextEdgeInputFormat.class);
     conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
@@ -129,7 +130,7 @@ public class TestEdgeInput extends BspCase {
     };
 
     GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setVertexClass(TestVertexDoNothing.class);
+    conf.setVertexClass(IntIntNullVertexDoNothing.class);
     conf.setOutEdgesClass(ByteArrayEdges.class);
     conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
     conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
@@ -160,7 +161,7 @@ public class TestEdgeInput extends BspCase {
     assertEquals(3, (int) values.get(5));
 
     conf = new GiraphConfiguration();
-    conf.setVertexClass(TestVertexWithNumEdges.class);
+    conf.setVertexClass(VertexCountEdges.class);
     conf.setOutEdgesClass(ByteArrayEdges.class);
     conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
     conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
@@ -209,16 +210,7 @@ public class TestEdgeInput extends BspCase {
     assertEquals(0, (int) values.get(4));
   }
 
-  public static class TestVertexWithNumEdges extends Vertex<IntWritable,
-        IntWritable, NullWritable, NullWritable> {
-    @Override
-    public void compute(Iterable<NullWritable> messages) throws IOException {
-      setValue(new IntWritable(getNumEdges()));
-      voteToHalt();
-    }
-  }
-
-  public static class TestVertexCheckEdgesType extends TestVertexWithNumEdges {
+  public static class TestVertexCheckEdgesType extends VertexCountEdges {
     @Override
     public void compute(Iterable<NullWritable> messages) throws IOException {
       assertFalse(getEdges() instanceof TestOutEdgesFilterEven);
@@ -227,14 +219,6 @@ public class TestEdgeInput extends BspCase {
     }
   }
 
-  public static class TestVertexDoNothing extends Vertex<IntWritable,
-        IntWritable, NullWritable, NullWritable> {
-    @Override
-    public void compute(Iterable<NullWritable> messages) throws IOException {
-      voteToHalt();
-    }
-  }
-
   public static class TestVertexValueFactory
       implements VertexValueFactory<IntWritable> {
     @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/test/java/org/apache/giraph/io/TestFilters.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestFilters.java b/giraph-core/src/test/java/org/apache/giraph/io/TestFilters.java
new file mode 100644
index 0000000..83a366d
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestFilters.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io;
+
+import org.apache.giraph.BspCase;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.filters.EdgeInputFilter;
+import org.apache.giraph.io.filters.VertexInputFilter;
+import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.io.formats.IntIntTextVertexValueInputFormat;
+import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.giraph.vertices.IntIntNullVertexDoNothing;
+import org.apache.giraph.vertices.VertexCountEdges;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestFilters extends BspCase {
+  public TestFilters() {
+    super(TestFilters.class.getName());
+  }
+
+  public static class EdgeFilter implements EdgeInputFilter<IntWritable, NullWritable> {
+    @Override public boolean dropEdge(IntWritable sourceId, Edge<IntWritable, NullWritable> edge) {
+      return sourceId.get() == 2;
+    }
+  }
+
+  @Test
+  public void testEdgeFilter() throws Exception {
+    String[] edges = new String[] {
+        "1 2",
+        "2 3",
+        "2 4",
+        "4 1"
+    };
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setVertexClass(VertexCountEdges.class);
+    conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
+    conf.setEdgeInputFilterClass(EdgeFilter.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+    Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
+
+    Map<Integer, Integer> values = parseResults(results);
+
+    assertEquals(2, values.size());
+    assertEquals(1, (int) values.get(1));
+    assertEquals(1, (int) values.get(4));
+  }
+
+  public static class VertexFilter implements VertexInputFilter<IntWritable,
+      NullWritable, NullWritable, NullWritable> {
+    @Override
+    public boolean dropVertex(Vertex<IntWritable, NullWritable, NullWritable,
+        NullWritable> vertex) {
+      int id = vertex.getId().get();
+      return id == 2 || id == 3;
+    }
+  }
+
+  @Test
+  public void testVertexFilter() throws Exception {
+    String[] vertices = new String[] {
+        "1 1",
+        "2 2",
+        "3 3",
+        "4 4"
+    };
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setVertexClass(IntIntNullVertexDoNothing.class);
+    conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
+    conf.setVertexInputFilterClass(VertexFilter.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+    Iterable<String> results = InternalVertexRunner.run(conf, vertices);
+
+    Map<Integer, Integer> values = parseResults(results);
+
+    assertEquals(2, values.size());
+    assertEquals(1, (int) values.get(1));
+    assertEquals(4, (int) values.get(4));
+  }
+
+  private static Map<Integer, Integer> parseResults(Iterable<String> results) {
+    Map<Integer, Integer> values = Maps.newHashMap();
+    for (String line : results) {
+      String[] tokens = line.split("\\s+");
+      int id = Integer.valueOf(tokens[0]);
+      int value = Integer.valueOf(tokens[1]);
+      values.put(id, value);
+    }
+    return values;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/test/java/org/apache/giraph/vertices/IntIntNullVertexDoNothing.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/vertices/IntIntNullVertexDoNothing.java b/giraph-core/src/test/java/org/apache/giraph/vertices/IntIntNullVertexDoNothing.java
new file mode 100644
index 0000000..c98d580
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/vertices/IntIntNullVertexDoNothing.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.vertices;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+public class IntIntNullVertexDoNothing extends VertexDoNothing<IntWritable,
+    IntWritable, NullWritable, NullWritable> {
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/test/java/org/apache/giraph/vertices/VertexCountEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/vertices/VertexCountEdges.java b/giraph-core/src/test/java/org/apache/giraph/vertices/VertexCountEdges.java
new file mode 100644
index 0000000..9060bc7
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/vertices/VertexCountEdges.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.vertices;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import java.io.IOException;
+
+public class VertexCountEdges extends Vertex<IntWritable, IntWritable,
+    NullWritable, NullWritable> {
+  @Override
+  public void compute(Iterable<NullWritable> messages) throws IOException {
+    setValue(new IntWritable(getNumEdges()));
+    voteToHalt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/test/java/org/apache/giraph/vertices/VertexDoNothing.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/vertices/VertexDoNothing.java b/giraph-core/src/test/java/org/apache/giraph/vertices/VertexDoNothing.java
new file mode 100644
index 0000000..fac3fce
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/vertices/VertexDoNothing.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.vertices;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+public class VertexDoNothing<I extends WritableComparable, V extends Writable,
+    E extends Writable, M extends Writable> extends Vertex<I, V, E, M> {
+  @Override
+  public void compute(Iterable<M> messages) throws IOException {
+    voteToHalt();
+  }
+}
+