You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/05/20 19:27:05 UTC

[09/12] GIRAPH-667: Decouple Vertex data and Computation, make Computation and Combiner classes switchable (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 82e1b1e..99b28df 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -43,7 +43,6 @@ import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.worker.BspServiceWorker;
 import org.apache.giraph.worker.InputSplitsCallable;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.worker.WorkerObserver;
 import org.apache.giraph.zk.ZooKeeperManager;
@@ -72,9 +71,10 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.giraph.conf.GiraphConstants.EDGE_VALUE_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.MESSAGE_VALUE_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.INCOMING_MESSAGE_VALUE_CLASS;
 import static org.apache.giraph.conf.GiraphConstants.VERTEX_ID_CLASS;
 import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS;
 
 /**
  * The Giraph-specific business logic for a single BSP
@@ -86,11 +86,10 @@ import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_CLASS;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public class GraphTaskManager<I extends WritableComparable, V extends Writable,
-  E extends Writable, M extends Writable> implements
+  E extends Writable> implements
   ResetSuperstepMetricsObserver {
   /*if_not[PURE_YARN]
   static { // Eliminate this? Even MRv1 tasks should not need it here.
@@ -112,9 +111,9 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(GraphTaskManager.class);
   /** Coordination service worker */
-  private CentralizedServiceWorker<I, V, E, M> serviceWorker;
+  private CentralizedServiceWorker<I, V, E> serviceWorker;
   /** Coordination service master */
-  private CentralizedServiceMaster<I, V, E, M> serviceMaster;
+  private CentralizedServiceMaster<I, V, E> serviceMaster;
   /** Coordination service master thread */
   private Thread masterThread = null;
   /** The worker should be run exactly once, or else there is a problem. */
@@ -122,7 +121,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   /** Manages the ZooKeeper servers if necessary (dynamic startup) */
   private ZooKeeperManager zkManager;
   /** Configuration */
-  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  private ImmutableClassesGiraphConfiguration<I, V, E> conf;
   /** Already complete? */
   private boolean done = false;
   /** What kind of functions is this mapper doing? */
@@ -176,7 +175,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   public void setup(Path[] zkPathList)
     throws IOException, InterruptedException {
     context.setStatus("setup: Beginning worker setup.");
-    conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+    conf = new ImmutableClassesGiraphConfiguration<I, V, E>(
       context.getConfiguration());
     determineClassTypes(conf);
     // configure global logging level for Giraph job
@@ -233,8 +232,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
     if (collectInputSuperstepStats(finishedSuperstepStats)) {
       return;
     }
-    WorkerAggregatorUsage aggregatorUsage =
-      prepareAggregatorsAndGraphState();
+    prepareGraphStateAndWorkerContext();
     List<PartitionStats> partitionStatsList = new ArrayList<PartitionStats>();
     int numComputeThreads = conf.getNumComputeThreads();
 
@@ -243,24 +241,22 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
       final long superstep = serviceWorker.getSuperstep();
       GiraphTimerContext superstepTimerContext =
         getTimerForThisSuperstep(superstep);
-      GraphState<I, V, E, M> graphState =
-        new GraphState<I, V, E, M>(superstep,
-            finishedSuperstepStats.getVertexCount(),
-            finishedSuperstepStats.getEdgeCount(),
-          context, this, null, aggregatorUsage);
+      GraphState graphState = new GraphState(superstep,
+          finishedSuperstepStats.getVertexCount(),
+          finishedSuperstepStats.getEdgeCount(),
+          context);
       Collection<? extends PartitionOwner> masterAssignedPartitionOwners =
-        serviceWorker.startSuperstep(graphState);
+        serviceWorker.startSuperstep();
       if (LOG.isDebugEnabled()) {
         LOG.debug("execute: " + MemoryUtils.getRuntimeMemoryStats());
       }
       context.progress();
       serviceWorker.exchangeVertexPartitions(masterAssignedPartitionOwners);
       context.progress();
-      graphState = checkSuperstepRestarted(
-        aggregatorUsage, superstep, graphState);
+      graphState = checkSuperstepRestarted(superstep, graphState);
       prepareForSuperstep(graphState);
       context.progress();
-      MessageStoreByPartition<I, M> messageStore =
+      MessageStoreByPartition<I, Writable> messageStore =
         serviceWorker.getServerData().getCurrentMessageStore();
       int numPartitions = serviceWorker.getPartitionStore().getNumPartitions();
       int numThreads = Math.min(numComputeThreads, numPartitions);
@@ -276,14 +272,14 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
           messageStore, numPartitions, numThreads);
       }
       finishedSuperstepStats = completeSuperstepAndCollectStats(
-        partitionStatsList, superstepTimerContext, graphState);
+        partitionStatsList, superstepTimerContext);
       // END of superstep compute loop
     } while (!finishedSuperstepStats.allVerticesHalted());
 
     if (LOG.isInfoEnabled()) {
       LOG.info("execute: BSP application done (global vertices marked done)");
     }
-    updateSuperstepGraphState(aggregatorUsage);
+    updateSuperstepGraphState();
     postApplication();
   }
 
@@ -364,15 +360,12 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
 
   /**
    * Utility to place a new, updated GraphState object into the serviceWorker.
-   * @param aggregatorUsage handle to aggregation metadata
    */
-  private void updateSuperstepGraphState(
-    WorkerAggregatorUsage aggregatorUsage) {
+  private void updateSuperstepGraphState() {
     serviceWorker.getWorkerContext().setGraphState(
-      new GraphState<I, V, E, M>(serviceWorker.getSuperstep(),
-        finishedSuperstepStats.getVertexCount(),
-          finishedSuperstepStats.getEdgeCount(), context, this, null,
-          aggregatorUsage));
+        new GraphState(serviceWorker.getSuperstep(),
+            finishedSuperstepStats.getVertexCount(),
+            finishedSuperstepStats.getEdgeCount(), context));
   }
 
   /**
@@ -380,15 +373,12 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
    * end of each superstep processing loop in the <code>execute</code> method.
    * @param partitionStatsList list of stas for each superstep to append to
    * @param superstepTimerContext for job metrics
-   * @param graphState the graph state metadata
    * @return the collected stats at the close of the current superstep.
    */
   private FinishedSuperstepStats completeSuperstepAndCollectStats(
     List<PartitionStats> partitionStatsList,
-    GiraphTimerContext superstepTimerContext,
-    GraphState<I, V, E, M> graphState) {
-    finishedSuperstepStats =
-      serviceWorker.finishSuperstep(graphState, partitionStatsList);
+    GiraphTimerContext superstepTimerContext) {
+    finishedSuperstepStats = serviceWorker.finishSuperstep(partitionStatsList);
     superstepTimerContext.stop();
     if (conf.metricsEnabled()) {
       GiraphMetrics.get().perSuperstep().printSummary(System.err);
@@ -401,7 +391,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
    * operations for the next superstep.
    * @param graphState graph state metadata object
    */
-  private void prepareForSuperstep(GraphState<I, V, E, M> graphState) {
+  private void prepareForSuperstep(GraphState graphState) {
     serviceWorker.prepareSuperstep();
 
     serviceWorker.getWorkerContext().setGraphState(graphState);
@@ -417,15 +407,11 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   }
 
   /**
-   * Prepare aggregators and worker context for superstep cycles.
-   * @return aggregator metadata object
+   * Prepare graph state and worker context for superstep cycles.
    */
-  private WorkerAggregatorUsage prepareAggregatorsAndGraphState() {
-    WorkerAggregatorUsage aggregatorUsage =
-      serviceWorker.getAggregatorHandler();
-    updateSuperstepGraphState(aggregatorUsage);
+  private void prepareGraphStateAndWorkerContext() {
+    updateSuperstepGraphState();
     workerContextPreApp();
-    return aggregatorUsage;
   }
 
   /**
@@ -459,18 +445,22 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   public void determineClassTypes(Configuration conf) {
     ImmutableClassesGiraphConfiguration giraphConf =
         new ImmutableClassesGiraphConfiguration(conf);
-    Class<? extends Vertex<I, V, E, M>> vertexClass =
-        giraphConf.getVertexClass();
-    List<Class<?>> classList = ReflectionUtils.<Vertex>getTypeArguments(
-        Vertex.class, vertexClass);
+    Class<? extends Computation<I, V, E, Writable, Writable>> computationClass =
+        giraphConf.getComputationClass();
+    List<Class<?>> classList = ReflectionUtils.<Computation>getTypeArguments(
+        Computation.class, computationClass);
     Type vertexIndexType = classList.get(0);
     Type vertexValueType = classList.get(1);
     Type edgeValueType = classList.get(2);
-    Type messageValueType = classList.get(3);
+    Type incomingMessageValueType = classList.get(3);
+    Type outgoingMessageValueType = classList.get(4);
     VERTEX_ID_CLASS.set(conf, (Class<WritableComparable>) vertexIndexType);
     VERTEX_VALUE_CLASS.set(conf, (Class<Writable>) vertexValueType);
     EDGE_VALUE_CLASS.set(conf, (Class<Writable>) edgeValueType);
-    MESSAGE_VALUE_CLASS.set(conf, (Class<Writable>) messageValueType);
+    INCOMING_MESSAGE_VALUE_CLASS.set(conf,
+        (Class<Writable>) incomingMessageValueType);
+    OUTGOING_MESSAGE_VALUE_CLASS.set(conf,
+        (Class<Writable>) outgoingMessageValueType);
   }
 
   /**
@@ -567,16 +557,16 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
         LOG.info("setup: Starting up BspServiceMaster " +
           "(master thread)...");
       }
-      serviceMaster = new BspServiceMaster<I, V, E, M>(
+      serviceMaster = new BspServiceMaster<I, V, E>(
         serverPortList, sessionMsecTimeout, context, this);
-      masterThread = new MasterThread<I, V, E, M>(serviceMaster, context);
+      masterThread = new MasterThread<I, V, E>(serviceMaster, context);
       masterThread.start();
     }
     if (graphFunctions.isWorker()) {
       if (LOG.isInfoEnabled()) {
         LOG.info("setup: Starting up BspServiceWorker...");
       }
-      serviceWorker = new BspServiceWorker<I, V, E, M>(
+      serviceWorker = new BspServiceWorker<I, V, E>(
         serverPortList,
         sessionMsecTimeout,
         context,
@@ -729,8 +719,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
    */
   private void processGraphPartitions(final Mapper<?, ?, ?, ?>.Context context,
       List<PartitionStats> partitionStatsList,
-      final GraphState<I, V, E, M> graphState,
-      final MessageStoreByPartition<I, M> messageStore,
+      final GraphState graphState,
+      final MessageStoreByPartition<I, Writable> messageStore,
       int numPartitions,
       int numThreads) {
     final BlockingQueue<Integer> computePartitionIdQueue =
@@ -748,7 +738,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
           @Override
           public Callable<Collection<PartitionStats>> newCallable(
               int callableId) {
-            return new ComputeCallable<I, V, E, M>(
+            return new ComputeCallable<I, V, E, Writable, Writable>(
                 context,
                 graphState,
                 messageStore,
@@ -769,14 +759,12 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
 
   /**
    * Handle the event that this superstep is a restart of a failed one.
-   * @param aggregatorUsage aggregator metadata
    * @param superstep current superstep
    * @param graphState the BSP graph state
    * @return the graph state, updated if this is a restart superstep
    */
-  private GraphState<I, V, E, M> checkSuperstepRestarted(
-    WorkerAggregatorUsage aggregatorUsage, long superstep,
-    GraphState<I, V, E, M> graphState) throws IOException {
+  private GraphState checkSuperstepRestarted(long superstep,
+    GraphState graphState) throws IOException {
     // Might need to restart from another superstep
     // (manually or automatic), or store a checkpoint
     if (serviceWorker.getRestartedSuperstep() == superstep) {
@@ -788,10 +776,10 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
       finishedSuperstepStats = new FinishedSuperstepStats(0, false,
           vertexEdgeCount.getVertexCount(), vertexEdgeCount.getEdgeCount(),
           false);
-      graphState = new GraphState<I, V, E, M>(superstep,
+      graphState = new GraphState(superstep,
           finishedSuperstepStats.getVertexCount(),
           finishedSuperstepStats.getEdgeCount(),
-          context, this, null, aggregatorUsage);
+          context);
     } else if (serviceWorker.checkpointFrequencyMet(superstep)) {
       serviceWorker.storeCheckpoint();
     }
@@ -927,4 +915,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
           "original expection will be rethrown", e1);
     }
   }
+
+  public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
+    return conf;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
index 61624e5..82fbe0c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
@@ -28,30 +28,21 @@ import org.apache.giraph.edge.MutableEdgesWrapper;
 import org.apache.giraph.edge.MutableOutEdges;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.edge.StrictRandomAccessOutEdges;
-import org.apache.giraph.partition.PartitionContext;
-import org.apache.giraph.worker.WorkerAggregatorUsage;
-import org.apache.giraph.worker.WorkerContext;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
 
-import java.io.IOException;
 import java.util.Iterator;
 
 /**
- * Basic abstract class for writing a BSP application for computation.
- * Giraph will store Vertex value and edges, hence all user data should
- * be stored as part of the vertex value.
+ * Class which holds vertex id, data and edges.
  *
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
-public abstract class Vertex<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
-    implements WorkerAggregatorUsage {
+public class Vertex<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    extends DefaultImmutableClassesGiraphConfigurable<I, V, E> {
   /** Vertex id. */
   private I id;
   /** Vertex value. */
@@ -60,8 +51,6 @@ public abstract class Vertex<I extends WritableComparable,
   private OutEdges<I, E> edges;
   /** If true, do not do anymore computation on this vertex. */
   private boolean halt;
-  /** Global graph state **/
-  private GraphState<I, V, E, M> graphState;
 
   /**
    * Initialize id, value, and edges.
@@ -109,25 +98,6 @@ public abstract class Vertex<I extends WritableComparable,
   }
 
   /**
-   * Must be defined by user to do computation on a single Vertex.
-   *
-   * @param messages Messages that were sent to this vertex in the previous
-   *                 superstep.  Each message is only guaranteed to have
-   *                 a life expectancy as long as next() is not called.
-   * @throws IOException
-   */
-  public abstract void compute(Iterable<M> messages) throws IOException;
-
-  /**
-   * Retrieves the current superstep.
-   *
-   * @return Current superstep
-   */
-  public long getSuperstep() {
-    return graphState.getSuperstep();
-  }
-
-  /**
    * Get the vertex id.
    *
    * @return My vertex id.
@@ -155,26 +125,6 @@ public abstract class Vertex<I extends WritableComparable,
   }
 
   /**
-   * Get the total (all workers) number of vertices that
-   * existed in the previous superstep.
-   *
-   * @return Total number of vertices (-1 if first superstep)
-   */
-  public long getTotalNumVertices() {
-    return graphState.getTotalNumVertices();
-  }
-
-  /**
-   * Get the total (all workers) number of edges that
-   * existed in the previous superstep.
-   *
-   * @return Total number of edges (-1 if first superstep)
-   */
-  public long getTotalNumEdges() {
-    return graphState.getTotalNumEdges();
-  }
-
-  /**
    * Get a read-only view of the out-edges of this vertex.
    * Note: edge objects returned by this iterable may be invalidated as soon
    * as the next element is requested. Thus, keeping a reference to an edge
@@ -329,32 +279,6 @@ public abstract class Vertex<I extends WritableComparable,
   }
 
   /**
-   * Send a message to a vertex id.  The message should not be mutated after
-   * this method returns or else undefined results could occur.
-   *
-   * @param id Vertex id to send the message to
-   * @param message Message data to send.  Note that after the message is sent,
-   *        the user should not modify the object.
-   */
-  public void sendMessage(I id, M message) {
-    if (graphState.getWorkerClientRequestProcessor().
-          sendMessageRequest(id, message)) {
-      graphState.getGraphTaskManager().notifySentMessages();
-    }
-  }
-
-  /**
-   * Send a message to all edges.
-   *
-   * @param message Message sent to all edges.
-   */
-  public void sendMessageToAllEdges(M message) {
-    for (Edge<I, E> edge : getEdges()) {
-      sendMessage(edge.getTargetVertexId(), message);
-    }
-  }
-
-  /**
    * After this is called, the compute() code will no longer be called for
    * this vertex unless a message is sent to it.  Then the compute() code
    * will be called once again until this function is called.  The
@@ -398,115 +322,6 @@ public abstract class Vertex<I extends WritableComparable,
     edges.remove(targetVertexId);
   }
 
-  /**
-   * Sends a request to create a vertex that will be available during the
-   * next superstep.
-   *
-   * @param id Vertex id
-   * @param value Vertex value
-   * @param edges Initial edges
-   */
-  public void addVertexRequest(I id, V value, OutEdges<I, E> edges)
-    throws IOException {
-    Vertex<I, V, E, M> vertex = getConf().createVertex();
-    vertex.initialize(id, value, edges);
-    graphState.getWorkerClientRequestProcessor().addVertexRequest(vertex);
-  }
-
-  /**
-   * Sends a request to create a vertex that will be available during the
-   * next superstep.
-   *
-   * @param id Vertex id
-   * @param value Vertex value
-   */
-  public void addVertexRequest(I id, V value) throws IOException {
-    addVertexRequest(id, value, getConf().createAndInitializeOutEdges());
-  }
-
-  /**
-   * Request to remove a vertex from the graph
-   * (applied just prior to the next superstep).
-   *
-   * @param vertexId Id of the vertex to be removed.
-   */
-  public void removeVertexRequest(I vertexId) throws IOException {
-    graphState.getWorkerClientRequestProcessor().
-        removeVertexRequest(vertexId);
-  }
-
-  /**
-   * Request to add an edge of a vertex in the graph
-   * (processed just prior to the next superstep)
-   *
-   * @param sourceVertexId Source vertex id of edge
-   * @param edge Edge to add
-   */
-  public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge)
-    throws IOException {
-    graphState.getWorkerClientRequestProcessor().
-        addEdgeRequest(sourceVertexId, edge);
-  }
-
-  /**
-   * Request to remove all edges from a given source vertex to a given target
-   * vertex (processed just prior to the next superstep).
-   *
-   * @param sourceVertexId Source vertex id
-   * @param targetVertexId Target vertex id
-   */
-  public void removeEdgesRequest(I sourceVertexId, I targetVertexId)
-    throws IOException {
-    graphState.getWorkerClientRequestProcessor().
-        removeEdgesRequest(sourceVertexId, targetVertexId);
-  }
-
-  /**
-   * Set the graph state for all workers
-   *
-   * @param graphState Graph state for all workers
-   */
-  public void setGraphState(GraphState<I, V, E, M> graphState) {
-    this.graphState = graphState;
-  }
-
-  /**
-   * Get the mapper context
-   *
-   * @return Mapper context
-   */
-  public Mapper.Context getContext() {
-    return graphState.getContext();
-  }
-
-  /**
-   * Get the partition context
-   *
-   * @return Partition context
-   */
-  public PartitionContext getPartitionContext() {
-    return graphState.getPartitionContext();
-  }
-
-  /**
-   * Get the worker context
-   *
-   * @return WorkerContext context
-   */
-  public WorkerContext getWorkerContext() {
-    return graphState.getGraphTaskManager().getWorkerContext();
-  }
-
-  @Override
-  public <A extends Writable> void aggregate(String name, A value) {
-    graphState.getWorkerAggregatorUsage().aggregate(name, value);
-  }
-
-  @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    return graphState.getWorkerAggregatorUsage().<A>getAggregatedValue(name);
-  }
-
   @Override
   public String toString() {
     return "Vertex(id=" + getId() + ",value=" + getValue() +

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
index 9474636..3d09c06 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
@@ -31,18 +31,17 @@ import java.util.List;
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message value
  */
 @SuppressWarnings("rawtypes")
 public interface VertexChanges<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
+    V extends Writable, E extends Writable> {
   /**
    * Get the added vertices for this particular vertex index from the previous
    * superstep.
    *
    * @return List of vertices for this vertex index.
    */
-  List<Vertex<I, V, E, M>> getAddedVertexList();
+  List<Vertex<I, V, E>> getAddedVertexList();
 
   /**
    * Get the number of times this vertex was removed in the previous

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
index 75c0aef..6f54dc7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
@@ -41,15 +41,13 @@ import java.util.List;
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message value
  */
 @SuppressWarnings("rawtypes")
 public class VertexMutations<I extends WritableComparable,
-    V extends Writable, E extends Writable,
-    M extends Writable> implements VertexChanges<I, V, E, M>,
+    V extends Writable, E extends Writable> implements VertexChanges<I, V, E>,
     Writable, ImmutableClassesGiraphConfigurable {
   /** List of added vertices during the last superstep */
-  private final List<Vertex<I, V, E, M>> addedVertexList = Lists.newArrayList();
+  private final List<Vertex<I, V, E>> addedVertexList = Lists.newArrayList();
   /** Count of remove vertex requests */
   private int removedVertexCount = 0;
   /** List of added edges */
@@ -57,15 +55,15 @@ public class VertexMutations<I extends WritableComparable,
   /** List of removed edges */
   private final List<I> removedEdgeList = Lists.newArrayList();
   /** Configuration */
-  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  private ImmutableClassesGiraphConfiguration<I, V, E> conf;
 
   /**
    * Copy the vertex mutations.
    *
    * @return Copied vertex mutations
    */
-  public VertexMutations<I, V, E, M> copy() {
-    VertexMutations<I, V, E, M> copied = new VertexMutations<I, V, E, M>();
+  public VertexMutations<I, V, E> copy() {
+    VertexMutations<I, V, E> copied = new VertexMutations<I, V, E>();
     copied.addedVertexList.addAll(this.addedVertexList);
     copied.removedVertexCount = this.removedVertexCount;
     copied.addedEdgeList.addAll(this.addedEdgeList);
@@ -75,7 +73,7 @@ public class VertexMutations<I extends WritableComparable,
   }
 
   @Override
-  public List<Vertex<I, V, E, M>> getAddedVertexList() {
+  public List<Vertex<I, V, E>> getAddedVertexList() {
     return addedVertexList;
   }
 
@@ -87,7 +85,7 @@ public class VertexMutations<I extends WritableComparable,
 
     int addedVertexListSize = input.readInt();
     for (int i = 0; i < addedVertexListSize; ++i) {
-      Vertex<I, V, E, M> vertex =
+      Vertex<I, V, E> vertex =
           WritableUtils.readVertexFromDataInput(input, getConf());
       addedVertexList.add(vertex);
     }
@@ -109,7 +107,7 @@ public class VertexMutations<I extends WritableComparable,
   @Override
   public void write(DataOutput output) throws IOException {
     output.writeInt(addedVertexList.size());
-    for (Vertex<I, V, E, M> vertex : addedVertexList) {
+    for (Vertex<I, V, E> vertex : addedVertexList) {
       WritableUtils.writeVertexToDataOutput(output, vertex, getConf());
     }
     output.writeInt(removedVertexCount);
@@ -129,7 +127,7 @@ public class VertexMutations<I extends WritableComparable,
    *
    * @param vertex Vertex to be added
    */
-  public void addVertex(Vertex<I, V, E, M> vertex) {
+  public void addVertex(Vertex<I, V, E> vertex) {
     addedVertexList.add(vertex);
   }
 
@@ -178,7 +176,7 @@ public class VertexMutations<I extends WritableComparable,
    *
    * @param vertexMutations Object to be added
    */
-  public void addVertexMutations(VertexMutations<I, V, E, M> vertexMutations) {
+  public void addVertexMutations(VertexMutations<I, V, E> vertexMutations) {
     addedVertexList.addAll(vertexMutations.getAddedVertexList());
     removedVertexCount += vertexMutations.getRemovedVertexCount();
     addedEdgeList.addAll(vertexMutations.getAddedEdgeList());

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
index 1fc0ddc..b6659f4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
@@ -28,12 +28,10 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public interface VertexResolver<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends GraphStateAware<I, V, E, M> {
+    V extends Writable, E extends Writable> {
   /**
    * A vertex may have been removed, created zero or more times and had
    * zero or more messages sent to it.  This method will handle all situations
@@ -47,15 +45,8 @@ public interface VertexResolver<I extends WritableComparable,
    * @return Vertex to be returned, if null, and a vertex currently exists
    *         it will be removed
    */
-  Vertex<I, V, E, M> resolve(I vertexId,
-      Vertex<I, V, E, M> vertex,
-      VertexChanges<I, V, E, M> vertexChanges,
+  Vertex<I, V, E> resolve(I vertexId,
+      Vertex<I, V, E> vertex,
+      VertexChanges<I, V, E> vertexChanges,
       boolean hasMessages);
-
-  /**
-   * Set the graph state.
-   *
-   * @param graphState Graph state saved.
-   */
-  void setGraphState(GraphState<I, V, E, M> graphState);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueFactory.java
index e62bb01..eb9197c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexValueFactory.java
@@ -36,7 +36,7 @@ public interface VertexValueFactory<V extends Writable> {
    * @param configuration Configuration
    */
   void initialize(
-      ImmutableClassesGiraphConfiguration<?, V, ?, ?> configuration);
+      ImmutableClassesGiraphConfiguration<?, V, ?> configuration);
 
   /**
    * Create a new vertex value.

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
index c03d718..ebc62f6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
@@ -42,12 +42,11 @@ import org.apache.log4j.Logger;
  * @param <I> Vertex id
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public class SuperstepHashPartitionerFactory<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends HashPartitionerFactory<I, V, E, M> {
+    V extends Writable, E extends Writable>
+    extends HashPartitionerFactory<I, V, E> {
   /**
    * Changes the {@link HashMasterPartitioner} to make ownership of the
    * partitions based on a superstep.  For testing only as it is totally
@@ -56,11 +55,10 @@ public class SuperstepHashPartitionerFactory<I extends WritableComparable,
    * @param <I> vertex id
    * @param <V> vertex data
    * @param <E> edge data
-   * @param <M> message data
    */
   private static class SuperstepMasterPartition<I extends WritableComparable,
-      V extends Writable, E extends Writable, M extends Writable>
-      extends HashMasterPartitioner<I, V, E, M> {
+      V extends Writable, E extends Writable>
+      extends HashMasterPartitioner<I, V, E> {
     /** Class logger */
     private static Logger LOG =
         Logger.getLogger(SuperstepMasterPartition.class);
@@ -120,8 +118,8 @@ public class SuperstepHashPartitionerFactory<I extends WritableComparable,
   }
 
   @Override
-  public MasterGraphPartitioner<I, V, E, M>
+  public MasterGraphPartitioner<I, V, E>
   createMasterGraphPartitioner() {
-    return new SuperstepMasterPartition<I, V, E, M>(getConf());
+    return new SuperstepMasterPartition<I, V, E>(getConf());
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
index 6d87bf2..a1af98f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 @SuppressWarnings("rawtypes")
 public abstract class EdgeReader<I extends WritableComparable,
     E extends Writable> extends DefaultImmutableClassesGiraphConfigurable<
-        I, Writable, E, Writable> {
+        I, Writable, E> {
   /**
    * Use the input split and context to setup reading the edges.
    * Guaranteed to be called prior to any other function.

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
index 58d79a6..86e86d8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/GiraphInputFormat.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.util.ReflectionUtils;
  */
 public abstract class GiraphInputFormat<I extends WritableComparable,
     V extends Writable, E extends Writable> extends
-    DefaultImmutableClassesGiraphConfigurable<I, V, E, Writable> {
+    DefaultImmutableClassesGiraphConfigurable<I, V, E> {
   /**
    * Get the list of input splits for the format.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/SimpleVertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/SimpleVertexWriter.java b/giraph-core/src/main/java/org/apache/giraph/io/SimpleVertexWriter.java
index e4c3496..41a049f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/SimpleVertexWriter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/SimpleVertexWriter.java
@@ -40,6 +40,6 @@ public interface SimpleVertexWriter<I extends WritableComparable,
    * @throws IOException
    * @throws InterruptedException
    */
-  void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException,
+  void writeVertex(Vertex<I, V, E> vertex) throws IOException,
       InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java
index 154f7e4..ad00a8e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexOutputFormat.java
@@ -49,7 +49,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 public abstract class VertexOutputFormat<
     I extends WritableComparable, V extends Writable,
     E extends Writable> extends
-    DefaultImmutableClassesGiraphConfigurable<I, V, E, Writable> {
+    DefaultImmutableClassesGiraphConfigurable<I, V, E> {
   /**
    * Create a vertex writer for a given split. The framework will call
    * {@link VertexWriter#initialize(TaskAttemptContext)} before

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
index 3f6bb3f..9695169 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 @SuppressWarnings("rawtypes")
 public abstract class VertexReader<I extends WritableComparable,
     V extends Writable, E extends Writable> extends
-    DefaultImmutableClassesGiraphConfigurable<I, V, E, Writable> {
+    DefaultImmutableClassesGiraphConfigurable<I, V, E> {
   /**
    * Use the input split and context to setup reading the vertices.
    * Guaranteed to be called prior to any other function.
@@ -68,7 +68,7 @@ public abstract class VertexReader<I extends WritableComparable,
    * @throws IOException
    * @throws InterruptedException
    */
-  public abstract Vertex<I, V, E, ?> getCurrentVertex()
+  public abstract Vertex<I, V, E> getCurrentVertex()
     throws IOException, InterruptedException;
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java
index 0b06a4a..70e721e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java
@@ -39,9 +39,9 @@ public abstract class VertexValueReader<I extends WritableComparable,
   }
 
   @Override
-  public final Vertex<I, V, Writable, ?> getCurrentVertex() throws IOException,
+  public final Vertex<I, V, Writable> getCurrentVertex() throws IOException,
       InterruptedException {
-    Vertex<I, V, Writable, ?> vertex = getConf().createVertex();
+    Vertex<I, V, Writable> vertex = getConf().createVertex();
     vertex.initialize(getCurrentVertexId(), getCurrentVertexValue());
     return vertex;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
index a4285c1..69fdfc5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 @SuppressWarnings("rawtypes")
 public abstract class VertexWriter<I extends WritableComparable,
     V extends Writable, E extends Writable>
-    extends DefaultImmutableClassesGiraphConfigurable<I, V, E, Writable>
+    extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
     implements SimpleVertexWriter<I, V, E> {
   /**
    * Use the context to setup writing the vertices.

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultEdgeInputFilter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultEdgeInputFilter.java b/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultEdgeInputFilter.java
index ad52496..40ab922 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultEdgeInputFilter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultEdgeInputFilter.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.io.WritableComparable;
  */
 public class DefaultEdgeInputFilter<I extends WritableComparable,
     E extends Writable>
-    extends DefaultImmutableClassesGiraphConfigurable<I, Writable, E, Writable>
+    extends DefaultImmutableClassesGiraphConfigurable<I, Writable, E>
     implements EdgeInputFilter<I, E> {
   @Override
   public boolean dropEdge(I sourceId, Edge<I, E> edge) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultVertexInputFilter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultVertexInputFilter.java b/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultVertexInputFilter.java
index 2976cbc..f35ade5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultVertexInputFilter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultVertexInputFilter.java
@@ -32,10 +32,10 @@ import org.apache.hadoop.io.WritableComparable;
  */
 public class DefaultVertexInputFilter<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
-    extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
-    implements VertexInputFilter<I, V, E, M> {
+    extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+    implements VertexInputFilter<I, V, E> {
   @Override
-  public boolean dropVertex(Vertex<I, V, E, M> vertex) {
+  public boolean dropVertex(Vertex<I, V, E> vertex) {
     return false;
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/filters/VertexInputFilter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/filters/VertexInputFilter.java b/giraph-core/src/main/java/org/apache/giraph/io/filters/VertexInputFilter.java
index d9af103..c7d178e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/filters/VertexInputFilter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/filters/VertexInputFilter.java
@@ -27,16 +27,15 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <I> Vertex ID
  * @param <V> Vertex Value
  * @param <E> Edge Value
- * @param <M> Message Value
  */
 public interface VertexInputFilter<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
+    V extends Writable, E extends Writable> {
   /**
    * Whether to drop a vertex on input.
    *
    * @param vertex to check
    * @return true if we should drop vertex
    */
-  boolean dropVertex(Vertex<I, V, E, M> vertex);
+  boolean dropVertex(Vertex<I, V, E> vertex);
 }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java
index f71ef25..8efbfd0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java
@@ -69,7 +69,7 @@ public class AdjacencyListTextVertexOutputFormat<I extends WritableComparable,
     }
 
     @Override
-    public Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
+    public Text convertVertexToLine(Vertex<I, V, E> vertex)
       throws IOException {
       StringBuffer sb = new StringBuffer(vertex.getId().toString());
       sb.append(delimiter);

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
index 6dd7468..bd69586 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
@@ -75,7 +75,7 @@ public class IdWithValueTextOutputFormat<I extends WritableComparable,
     }
 
     @Override
-    protected Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
+    protected Text convertVertexToLine(Vertex<I, V, E> vertex)
       throws IOException {
       String first;
       String second;

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntTextVertexValueInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntTextVertexValueInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntTextVertexValueInputFormat.java
index a7dbef8..1038a32 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntTextVertexValueInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntTextVertexValueInputFormat.java
@@ -35,11 +35,9 @@ import java.util.regex.Pattern;
  * Each line consists of: id, value
  *
  * @param <E> Edge value
- * @param <M> Message data
  */
-public class IntIntTextVertexValueInputFormat<E extends Writable,
-    M extends Writable> extends
-    TextVertexValueInputFormat<IntWritable, IntWritable, E, M> {
+public class IntIntTextVertexValueInputFormat<E extends Writable> extends
+    TextVertexValueInputFormat<IntWritable, IntWritable, E> {
   /** Separator for id and value */
   private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java
index 7d8fcf6..a71cf96 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java
@@ -60,7 +60,7 @@ public class JsonBase64VertexOutputFormat<I extends WritableComparable,
   protected class JsonBase64VertexWriter extends TextVertexWriterToEachLine {
 
     @Override
-    protected Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
+    protected Text convertVertexToLine(Vertex<I, V, E> vertex)
       throws IOException {
       ByteArrayOutputStream outputStream =
           new ByteArrayOutputStream();

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java
index 7dfd607..112860c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java
@@ -95,9 +95,8 @@ public class JsonLongDoubleFloatDoubleVertexInputFormat extends
     }
 
     @Override
-    protected Vertex<LongWritable, DoubleWritable, FloatWritable,
-              DoubleWritable> handleException(Text line, JSONArray jsonVertex,
-                  JSONException e) {
+    protected Vertex<LongWritable, DoubleWritable, FloatWritable>
+    handleException(Text line, JSONArray jsonVertex, JSONException e) {
       throw new IllegalArgumentException(
           "Couldn't get vertex from line " + line, e);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java
index d0a3305..13c80aa 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java
@@ -52,8 +52,7 @@ public class JsonLongDoubleFloatDoubleVertexOutputFormat extends
     TextVertexWriterToEachLine {
     @Override
     public Text convertVertexToLine(
-      Vertex<LongWritable, DoubleWritable,
-        FloatWritable, ?> vertex
+      Vertex<LongWritable, DoubleWritable, FloatWritable> vertex
     ) throws IOException {
       JSONArray jsonVertex = new JSONArray();
       try {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java
index c9390ba..d8abfdb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomIntNullVertexInputFormat.java
@@ -116,9 +116,9 @@ public class PseudoRandomIntNullVertexInputFormat extends
     }
 
     @Override
-    public Vertex<IntWritable, FloatWritable, NullWritable, ?>
+    public Vertex<IntWritable, FloatWritable, NullWritable>
     getCurrentVertex() throws IOException, InterruptedException {
-      Vertex<IntWritable, FloatWritable, NullWritable, ?> vertex =
+      Vertex<IntWritable, FloatWritable, NullWritable> vertex =
           getConf().createVertex();
       int vertexId = startingVertexId + verticesRead;
       OutEdges<IntWritable, NullWritable> edges =

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
index 5d293eb..91a19e6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
@@ -133,9 +133,9 @@ public class PseudoRandomVertexInputFormat extends
     }
 
     @Override
-    public Vertex<LongWritable, DoubleWritable, DoubleWritable, ?>
+    public Vertex<LongWritable, DoubleWritable, DoubleWritable>
     getCurrentVertex() throws IOException, InterruptedException {
-      Vertex<LongWritable, DoubleWritable, DoubleWritable, ?>
+      Vertex<LongWritable, DoubleWritable, DoubleWritable>
       vertex = getConf().createVertex();
       long vertexId = startingVertexId + verticesRead;
       // Seed on the vertex id to keep the vertex data the same when

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
index 0f2d929..1d31f4f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
  */
 @SuppressWarnings("rawtypes")
 public class SequenceFileVertexInputFormat<I extends WritableComparable,
-    V extends Writable, E extends Writable, X extends Vertex<I, V, E, ?>>
+    V extends Writable, E extends Writable, X extends Vertex<I, V, E>>
     extends VertexInputFormat<I, V, E> {
   /** Internal input format */
   protected SequenceFileInputFormat<I, X> sequenceFileInputFormat =
@@ -68,7 +68,7 @@ public class SequenceFileVertexInputFormat<I extends WritableComparable,
    * @param <X> Value type
    */
   public static class SequenceFileVertexReader<I extends WritableComparable,
-      V extends Writable, E extends Writable, X extends Vertex<I, V, E, ?>>
+      V extends Writable, E extends Writable, X extends Vertex<I, V, E>>
       extends VertexReader<I, V, E> {
     /** Internal record reader from {@link SequenceFileInputFormat} */
     private final RecordReader<I, X> recordReader;
@@ -92,7 +92,7 @@ public class SequenceFileVertexInputFormat<I extends WritableComparable,
       return recordReader.nextKeyValue();
     }
 
-    @Override public Vertex<I, V, E, ?> getCurrentVertex()
+    @Override public Vertex<I, V, E> getCurrentVertex()
       throws IOException, InterruptedException {
       return recordReader.getCurrentValue();
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java
index c4ed65c..c6ff6d4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java
@@ -105,7 +105,7 @@ public abstract class SequenceFileVertexOutputFormat<
     }
 
     @Override
-    public final void writeVertex(Vertex<I, V, E, ?> vertex) throws
+    public final void writeVertex(Vertex<I, V, E> vertex) throws
       IOException, InterruptedException {
       // Convert vertex id to type OK.
       OK outKey = convertToSequenceFileKey(vertex.getId());

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java
index 6e62b71..17174a3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java
@@ -21,7 +21,6 @@ import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.EdgeFactory;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
@@ -29,10 +28,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
  * Class to read graphs stored as adjacency lists with ids represented by
  * Strings and values as doubles.  This is a good inputformat for reading
  * graphs where the id types do not matter and can be stashed in a String.
- *
- * @param <M> Message type.
  */
-public class TextDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable>
+public class TextDoubleDoubleAdjacencyListVertexInputFormat
     extends AdjacencyListTextVertexInputFormat<Text, DoubleWritable,
             DoubleWritable>  {
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
index b08e6f7..debdccc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
@@ -154,10 +154,10 @@ public abstract class TextVertexInputFormat<I extends WritableComparable,
     TextVertexReader {
 
     @Override
-    public final Vertex<I, V, E, ?> getCurrentVertex() throws IOException,
+    public final Vertex<I, V, E> getCurrentVertex() throws IOException,
     InterruptedException {
       Text line = getRecordReader().getCurrentValue();
-      Vertex<I, V, E, ?> vertex = getConf().createVertex();
+      Vertex<I, V, E> vertex = getConf().createVertex();
       vertex.initialize(getId(line), getValue(line), getEdges(line));
       return vertex;
     }
@@ -222,10 +222,10 @@ public abstract class TextVertexInputFormat<I extends WritableComparable,
     }
 
     @Override
-    public final Vertex<I, V, E, ?> getCurrentVertex() throws IOException,
+    public final Vertex<I, V, E> getCurrentVertex() throws IOException,
     InterruptedException {
       Text line = getRecordReader().getCurrentValue();
-      Vertex<I, V, E, ?> vertex;
+      Vertex<I, V, E> vertex;
       T processed = preprocessLine(line);
       vertex = getConf().createVertex();
       vertex.initialize(getId(processed), getValue(processed),
@@ -306,11 +306,11 @@ public abstract class TextVertexInputFormat<I extends WritableComparable,
 
     @SuppressWarnings("unchecked")
     @Override
-    public final Vertex<I, V, E, ?> getCurrentVertex() throws IOException,
+    public final Vertex<I, V, E> getCurrentVertex() throws IOException,
         InterruptedException {
       // Note we are reading from value only since key is the line number
       Text line = getRecordReader().getCurrentValue();
-      Vertex<I, V, E, ?> vertex;
+      Vertex<I, V, E> vertex;
       T processed = null;
       try {
         processed = preprocessLine(line);
@@ -401,7 +401,7 @@ public abstract class TextVertexInputFormat<I extends WritableComparable,
      *          the exception thrown while reading the line
      * @return the recovered/alternative vertex to be used
      */
-    protected Vertex<I, V, E, ?> handleException(Text line, T processed, X e) {
+    protected Vertex<I, V, E> handleException(Text line, T processed, X e) {
       throw new IllegalArgumentException(e);
     }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
index a3073f9..c91d543 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
@@ -158,7 +158,7 @@ public abstract class TextVertexOutputFormat<I extends WritableComparable,
      * @throws IOException
      *           exception that can be thrown while writing
      */
-    protected abstract Text convertVertexToLine(Vertex<I, V, E, ?> vertex)
+    protected abstract Text convertVertexToLine(Vertex<I, V, E> vertex)
       throws IOException;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
index 6d133ae..e960444 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexValueInputFormat.java
@@ -39,11 +39,10 @@ import java.util.List;
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message value
  */
 @SuppressWarnings("rawtypes")
 public abstract class TextVertexValueInputFormat<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
+    V extends Writable, E extends Writable>
     extends VertexValueInputFormat<I, V> {
   /** Uses the GiraphTextInputFormat to do everything */
   protected GiraphTextInputFormat textInputFormat = new GiraphTextInputFormat();

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/EdgeInputFormatDescription.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/EdgeInputFormatDescription.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/EdgeInputFormatDescription.java
index 569cee9..4b48e63 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/EdgeInputFormatDescription.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/EdgeInputFormatDescription.java
@@ -78,11 +78,11 @@ public class EdgeInputFormatDescription<I extends WritableComparable,
    * @param conf Configuration which we want to create a copy from
    * @return Copy of configuration
    */
-  private ImmutableClassesGiraphConfiguration<I, Writable, E, Writable>
+  private ImmutableClassesGiraphConfiguration<I, Writable, E>
   createConfigurationCopy(
-      ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
-    ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> confCopy =
-        new ImmutableClassesGiraphConfiguration<I, Writable, E, Writable>(conf);
+      ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
+    ImmutableClassesGiraphConfiguration<I, Writable, E> confCopy =
+        new ImmutableClassesGiraphConfiguration<I, Writable, E>(conf);
     confCopy.setEdgeInputFormatClass(getInputFormatClass());
     putParametersToConfiguration(confCopy);
     return confCopy;
@@ -130,13 +130,13 @@ public class EdgeInputFormatDescription<I extends WritableComparable,
    */
   public static <I extends WritableComparable,
       E extends Writable> List<EdgeInputFormat<I, E>> createEdgeInputFormats(
-      ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+      ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
     List<EdgeInputFormatDescription<I, E>> descriptions =
         getEdgeInputFormatDescriptions(conf);
     List<EdgeInputFormat<I, E>> edgeInputFormats =
         Lists.newArrayListWithCapacity(descriptions.size());
     for (EdgeInputFormatDescription<I, E> description : descriptions) {
-      ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> confCopy =
+      ImmutableClassesGiraphConfiguration<I, Writable, E> confCopy =
           description.createConfigurationCopy(conf);
       edgeInputFormats.add(confCopy.createWrappedEdgeInputFormat());
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
index c377fbc..fa8839b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java
@@ -48,7 +48,7 @@ public class MultiEdgeInputFormat<I extends WritableComparable,
 
   @Override
   public void setConf(
-      ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+      ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
     super.setConf(conf);
     edgeInputFormats =
         EdgeInputFormatDescription.createEdgeInputFormats(getConf());

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
index 72929d9..e851e38 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java
@@ -49,7 +49,7 @@ public class MultiVertexInputFormat<I extends WritableComparable,
 
   @Override
   public void setConf(
-      ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+      ImmutableClassesGiraphConfiguration<I, V, E> conf) {
     super.setConf(conf);
     vertexInputFormats =
         VertexInputFormatDescription.createVertexInputFormats(getConf());

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/VertexInputFormatDescription.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/VertexInputFormatDescription.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/VertexInputFormatDescription.java
index bdd5a74..1487749 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/VertexInputFormatDescription.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/VertexInputFormatDescription.java
@@ -80,11 +80,11 @@ public class VertexInputFormatDescription<I extends WritableComparable,
    * @param conf Configuration which we want to create a copy from
    * @return Copy of configuration
    */
-  private ImmutableClassesGiraphConfiguration<I, V, E, Writable>
+  private ImmutableClassesGiraphConfiguration<I, V, E>
   createConfigurationCopy(
-      ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
-    ImmutableClassesGiraphConfiguration<I, V, E, Writable> confCopy =
-        new ImmutableClassesGiraphConfiguration<I, V, E, Writable>(conf);
+      ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+    ImmutableClassesGiraphConfiguration<I, V, E> confCopy =
+        new ImmutableClassesGiraphConfiguration<I, V, E>(conf);
     confCopy.setVertexInputFormatClass(getInputFormatClass());
     putParametersToConfiguration(confCopy);
     return confCopy;
@@ -136,13 +136,13 @@ public class VertexInputFormatDescription<I extends WritableComparable,
   public static <I extends WritableComparable, V extends Writable,
       E extends Writable>
   List<VertexInputFormat<I, V, E>> createVertexInputFormats(
-      ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+      ImmutableClassesGiraphConfiguration<I, V, E> conf) {
     List<VertexInputFormatDescription<I, V, E>> descriptions =
         getVertexInputFormatDescriptions(conf);
     List<VertexInputFormat<I, V, E>> vertexInputFormats =
         Lists.newArrayListWithCapacity(descriptions.size());
     for (VertexInputFormatDescription<I, V, E> description : descriptions) {
-      ImmutableClassesGiraphConfiguration<I, V, E, Writable> confCopy =
+      ImmutableClassesGiraphConfiguration<I, V, E> confCopy =
           description.createConfigurationCopy(conf);
       vertexInputFormats.add(confCopy.createWrappedVertexInputFormat());
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
index c0a2cd1..aae7a72 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
@@ -50,7 +50,7 @@ public class WrappedEdgeReader<I extends WritableComparable,
    * @param conf Configuration
    */
   public WrappedEdgeReader(EdgeReader<I, E> baseEdgeReader,
-      ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+      ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
     this.baseEdgeReader = baseEdgeReader;
     super.setConf(conf);
     baseEdgeReader.setConf(conf);
@@ -58,7 +58,7 @@ public class WrappedEdgeReader<I extends WritableComparable,
 
   @Override
   public void setConf(
-      ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+      ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
     // We don't want to use external configuration
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java
index 8110209..bffa330 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexOutputFormat.java
@@ -70,7 +70,7 @@ public class WrappedVertexOutputFormat<I extends WritableComparable,
     return new VertexWriter<I, V, E>() {
       @Override
       public void setConf(
-          ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+          ImmutableClassesGiraphConfiguration<I, V, E> conf) {
         super.setConf(conf);
         vertexWriter.setConf(conf);
       }
@@ -91,7 +91,7 @@ public class WrappedVertexOutputFormat<I extends WritableComparable,
 
       @Override
       public void writeVertex(
-          Vertex<I, V, E, ?> vertex) throws IOException, InterruptedException {
+          Vertex<I, V, E> vertex) throws IOException, InterruptedException {
         vertexWriter.writeVertex(vertex);
       }
     };

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
index 3a8ac50..54adfec 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
@@ -51,7 +51,7 @@ public class WrappedVertexReader<I extends WritableComparable,
    * @param conf Configuration
    */
   public WrappedVertexReader(VertexReader<I, V, E> baseVertexReader,
-      ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+      ImmutableClassesGiraphConfiguration<I, V, E> conf) {
     this.baseVertexReader = baseVertexReader;
     super.setConf(conf);
     baseVertexReader.setConf(conf);
@@ -59,7 +59,7 @@ public class WrappedVertexReader<I extends WritableComparable,
 
   @Override
   public void setConf(
-      ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+      ImmutableClassesGiraphConfiguration<I, V, E> conf) {
     // We don't want to use external configuration
   }
 
@@ -76,7 +76,7 @@ public class WrappedVertexReader<I extends WritableComparable,
   }
 
   @Override
-  public Vertex<I, V, E, ?> getCurrentVertex() throws IOException,
+  public Vertex<I, V, E> getCurrentVertex() throws IOException,
       InterruptedException {
     return baseVertexReader.getCurrentVertex();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeReaderWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeReaderWrapper.java b/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeReaderWrapper.java
index efd8fe7..f61ac45 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeReaderWrapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/iterables/EdgeReaderWrapper.java
@@ -53,7 +53,7 @@ public class EdgeReaderWrapper<I extends WritableComparable,
 
   @Override
   public void setConf(
-      ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) {
+      ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
     super.setConf(conf);
     conf.configureIfPossible(edgeReader);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/iterables/VertexReaderWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/iterables/VertexReaderWrapper.java b/giraph-core/src/main/java/org/apache/giraph/io/iterables/VertexReaderWrapper.java
index 614f945..ca35c51 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/iterables/VertexReaderWrapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/iterables/VertexReaderWrapper.java
@@ -38,23 +38,23 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 public class VertexReaderWrapper<I extends WritableComparable,
     V extends Writable, E extends Writable> extends VertexReader<I, V, E> {
   /** Wrapped vertex reader */
-  private GiraphReader<Vertex<I, V, E, ?>> vertexReader;
+  private GiraphReader<Vertex<I, V, E>> vertexReader;
   /** {@link VertexReader}-like wrapper of {@link #vertexReader} */
-  private IteratorToReaderWrapper<Vertex<I, V, E, ?>> iterator;
+  private IteratorToReaderWrapper<Vertex<I, V, E>> iterator;
 
   /**
    * Constructor
    *
    * @param vertexReader GiraphReader for vertices to wrap
    */
-  public VertexReaderWrapper(GiraphReader<Vertex<I, V, E, ?>> vertexReader) {
+  public VertexReaderWrapper(GiraphReader<Vertex<I, V, E>> vertexReader) {
     this.vertexReader = vertexReader;
-    iterator = new IteratorToReaderWrapper<Vertex<I, V, E, ?>>(vertexReader);
+    iterator = new IteratorToReaderWrapper<Vertex<I, V, E>>(vertexReader);
   }
 
   @Override
   public void setConf(
-      ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) {
+      ImmutableClassesGiraphConfiguration<I, V, E> conf) {
     super.setConf(conf);
     conf.configureIfPossible(vertexReader);
   }
@@ -65,7 +65,7 @@ public class VertexReaderWrapper<I extends WritableComparable,
   }
 
   @Override
-  public Vertex<I, V, E, ?> getCurrentVertex() throws IOException,
+  public Vertex<I, V, E> getCurrentVertex() throws IOException,
       InterruptedException {
     return iterator.getCurrentObject();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
index a8dff87..452d93a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/MultiThreadedSuperstepOutput.java
@@ -47,7 +47,7 @@ public class MultiThreadedSuperstepOutput<I extends WritableComparable,
   /** Mapper context */
   private final Mapper<?, ?, ?, ?>.Context context;
   /** Configuration */
-  private ImmutableClassesGiraphConfiguration<I, V, E, ?> configuration;
+  private ImmutableClassesGiraphConfiguration<I, V, E> configuration;
   /** Vertex output format, used to get new vertex writers */
   private final VertexOutputFormat<I, V, E> vertexOutputFormat;
   /**
@@ -65,7 +65,7 @@ public class MultiThreadedSuperstepOutput<I extends WritableComparable,
    * @param context Mapper context
    */
   public MultiThreadedSuperstepOutput(
-      ImmutableClassesGiraphConfiguration<I, V, E, ?> conf,
+      ImmutableClassesGiraphConfiguration<I, V, E> conf,
       Mapper<?, ?, ?, ?>.Context context) {
     this.configuration = conf;
     vertexOutputFormat = conf.createWrappedVertexOutputFormat();
@@ -80,9 +80,7 @@ public class MultiThreadedSuperstepOutput<I extends WritableComparable,
     if (availableVertexWriters.isEmpty()) {
       try {
         vertexWriter = vertexOutputFormat.createVertexWriter(context);
-        vertexWriter.setConf(
-            (ImmutableClassesGiraphConfiguration<I, V, E, Writable>)
-                configuration);
+        vertexWriter.setConf(configuration);
         vertexWriter.initialize(context);
       } catch (IOException e) {
         throw new IllegalStateException("getVertexWriter: " +

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/NoOpSuperstepOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/NoOpSuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/NoOpSuperstepOutput.java
index 82684b2..be981cf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/NoOpSuperstepOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/NoOpSuperstepOutput.java
@@ -40,7 +40,7 @@ public class NoOpSuperstepOutput<I extends WritableComparable,
   public SimpleVertexWriter<I, V, E> getVertexWriter() {
     return new SimpleVertexWriter<I, V, E>() {
       @Override
-      public void writeVertex(Vertex<I, V, E, ?> vertex) throws IOException,
+      public void writeVertex(Vertex<I, V, E> vertex) throws IOException,
           InterruptedException {
       }
     };

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
index f94bd56..7f233e0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/superstep_output/SynchronizedSuperstepOutput.java
@@ -57,14 +57,13 @@ public class SynchronizedSuperstepOutput<I extends WritableComparable,
    */
   @SuppressWarnings("unchecked")
   public SynchronizedSuperstepOutput(
-      ImmutableClassesGiraphConfiguration<I, V, E, ?> conf,
+      ImmutableClassesGiraphConfiguration<I, V, E> conf,
       Mapper<?, ?, ?, ?>.Context context) {
     this.context = context;
     try {
       vertexWriter =
           conf.createWrappedVertexOutputFormat().createVertexWriter(context);
-      vertexWriter.setConf(
-          (ImmutableClassesGiraphConfiguration<I, V, E, Writable>) conf);
+      vertexWriter.setConf(conf);
       vertexWriter.initialize(context);
     } catch (IOException e) {
       throw new IllegalStateException("SynchronizedSuperstepOutput: " +
@@ -76,7 +75,7 @@ public class SynchronizedSuperstepOutput<I extends WritableComparable,
     simpleVertexWriter = new SimpleVertexWriter<I, V, E>() {
       @Override
       public synchronized void writeVertex(
-          Vertex<I, V, E, ?> vertex) throws IOException, InterruptedException {
+          Vertex<I, V, E> vertex) throws IOException, InterruptedException {
         vertexWriter.writeVertex(vertex);
       }
     };

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
index cc6b126..de17157 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
@@ -19,12 +19,12 @@
 package org.apache.giraph.job;
 
 import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.graph.Computation;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.graph.DefaultVertexResolver;
 import org.apache.giraph.graph.DefaultVertexValueFactory;
-import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.graph.VertexValueFactory;
 import org.apache.giraph.io.EdgeInputFormat;
@@ -51,10 +51,12 @@ import static org.apache.giraph.conf.GiraphConstants.VERTEX_RESOLVER_CLASS;
  * @param <I> the Vertex ID type
  * @param <V> the Vertex Value type
  * @param <E> the Edge Value type
- * @param <M> the Message type
+ * @param <M1> the incoming Message type
+ * @param <M2> the outgoing Message type
  */
 public class GiraphConfigurationValidator<I extends WritableComparable,
-  V extends Writable, E extends Writable, M extends Writable> {
+    V extends Writable, E extends Writable, M1 extends Writable,
+    M2 extends Writable> {
   /**
    * Class logger object.
    */
@@ -67,8 +69,8 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
   private static final int VALUE_PARAM_INDEX = 1;
   /** E param vertex index in classList */
   private static final int EDGE_PARAM_INDEX = 2;
-  /** M param vertex index in classList */
-  private static final int MSG_PARAM_INDEX = 3;
+  /** M2 param vertex index in classList */
+  private static final int OUTGOING_MSG_PARAM_INDEX = 4;
   /** M param vertex combiner index in classList */
   private static final int MSG_COMBINER_PARAM_INDEX = 1;
   /** E param edge input format index in classList */
@@ -80,12 +82,12 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
 
   /** Vertex Index Type */
   private Type vertexIndexType;
-  /** Vertex Index Type */
+  /** Vertex Value Type */
   private Type vertexValueType;
-  /** Vertex Index Type */
+  /** Edge Value Type */
   private Type edgeValueType;
-  /** Vertex Index Type */
-  private Type messageValueType;
+  /** Outgoing Message Type */
+  private Type outgoingMessageValueType;
 
   /**
    * The Configuration object for use in the validation test.
@@ -110,14 +112,14 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
    */
   public void validateConfiguration() {
     checkConfiguration();
-    Class<? extends Vertex<I, V, E, M>> vertexClass =
-      conf.getVertexClass();
+    Class<? extends Computation<I, V, E, M1, M2>> computationClass =
+      conf.getComputationClass();
     List<Class<?>> classList = ReflectionUtils.getTypeArguments(
-      Vertex.class, vertexClass);
+      Computation.class, computationClass);
     vertexIndexType = classList.get(ID_PARAM_INDEX);
     vertexValueType = classList.get(VALUE_PARAM_INDEX);
     edgeValueType = classList.get(EDGE_PARAM_INDEX);
-    messageValueType = classList.get(MSG_PARAM_INDEX);
+    outgoingMessageValueType = classList.get(OUTGOING_MSG_PARAM_INDEX);
     verifyOutEdgesGenericTypes();
     verifyVertexInputFormatGenericTypes();
     verifyEdgeInputFormatGenericTypes();
@@ -146,9 +148,9 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
       throw new IllegalArgumentException("checkConfiguration: No valid " +
           GiraphConstants.MIN_WORKERS);
     }
-    if (conf.getVertexClass() == null) {
+    if (conf.getComputationClass() == null) {
       throw new IllegalArgumentException("checkConfiguration: Null " +
-          GiraphConstants.VERTEX_CLASS.getKey());
+          GiraphConstants.COMPUTATION_CLASS.getKey());
     }
     if (conf.getVertexInputFormatClass() == null &&
         conf.getEdgeInputFormatClass() == null) {
@@ -281,7 +283,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
 
   /** If there is a combiner type, verify its generic params match the job. */
   private void verifyVertexCombinerGenericTypes() {
-    Class<? extends Combiner<I, M>> vertexCombinerClass =
+    Class<? extends Combiner<I, M2>> vertexCombinerClass =
       conf.getCombinerClass();
     if (vertexCombinerClass != null) {
       List<Class<?>> classList =
@@ -293,10 +295,11 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
             "vertex - " + vertexIndexType +
             ", vertex combiner - " + classList.get(ID_PARAM_INDEX));
       }
-      if (!messageValueType.equals(classList.get(MSG_COMBINER_PARAM_INDEX))) {
+      if (!outgoingMessageValueType.equals(
+          classList.get(MSG_COMBINER_PARAM_INDEX))) {
         throw new IllegalArgumentException(
           "checkClassTypes: Message value types don't match, " +
-            "vertex - " + messageValueType +
+            "vertex - " + outgoingMessageValueType +
             ", vertex combiner - " + classList.get(MSG_COMBINER_PARAM_INDEX));
       }
     }
@@ -360,14 +363,14 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
   /** If there is a vertex resolver,
    * validate the generic parameter types. */
   private void verifyVertexResolverGenericTypes() {
-    Class<? extends VertexResolver<I, V, E, M>>
+    Class<? extends VertexResolver<I, V, E>>
       vrClass = conf.getVertexResolverClass();
     if (!DefaultVertexResolver.class.isAssignableFrom(vrClass)) {
       return;
     }
-    Class<? extends DefaultVertexResolver<I, V, E, M>>
+    Class<? extends DefaultVertexResolver<I, V, E>>
       dvrClass =
-        (Class<? extends DefaultVertexResolver<I, V, E, M>>) vrClass;
+        (Class<? extends DefaultVertexResolver<I, V, E>>) vrClass;
     List<Class<?>> classList =
       ReflectionUtils.getTypeArguments(
           DefaultVertexResolver.class, dvrClass);
@@ -392,13 +395,6 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
           "vertex - " + edgeValueType +
           ", vertex resolver - " + classList.get(EDGE_PARAM_INDEX));
     }
-    if (classList.get(MSG_PARAM_INDEX) != null &&
-      !messageValueType.equals(classList.get(MSG_PARAM_INDEX))) {
-      throw new IllegalArgumentException(
-        "checkClassTypes: Message value types don't match, " +
-          "vertex - " + messageValueType +
-          ", vertex resolver - " + classList.get(MSG_PARAM_INDEX));
-    }
   }
 }