You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/05/20 19:27:04 UTC

[08/12] GIRAPH-667: Decouple Vertex data and Computation, make Computation and Combiner classes switchable (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index f00116a..bd48116 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -30,13 +30,13 @@ import org.apache.giraph.comm.netty.NettyMasterServer;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.counters.GiraphStats;
-import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.InputSplitPaths;
 import org.apache.giraph.graph.GlobalStats;
 import org.apache.giraph.graph.AddressesAndPartitionsWritable;
 import org.apache.giraph.graph.GraphFunctions;
 import org.apache.giraph.graph.InputSplitEvents;
 import org.apache.giraph.bsp.BspService;
+import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.GiraphInputFormat;
 import org.apache.giraph.graph.GraphTaskManager;
@@ -116,13 +116,12 @@ import static org.apache.giraph.conf.GiraphConstants.USE_INPUT_SPLIT_LOCALITY;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public class BspServiceMaster<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends BspService<I, V, E, M>
-    implements CentralizedServiceMaster<I, V, E, M>,
+    V extends Writable, E extends Writable>
+    extends BspService<I, V, E>
+    implements CentralizedServiceMaster<I, V, E>,
     ResetSuperstepMetricsObserver {
   /** Print worker names only if there are 10 workers left */
   public static final int MAX_PRINTABLE_REMAINING_WORKERS = 10;
@@ -158,7 +157,7 @@ public class BspServiceMaster<I extends WritableComparable,
   /** State of the superstep changed */
   private final BspEvent superstepStateChanged;
   /** Master graph partitioner */
-  private final MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner;
+  private final MasterGraphPartitioner<I, V, E> masterGraphPartitioner;
   /** All the partition stats from the last superstep */
   private final List<PartitionStats> allPartitionStatsList =
       new ArrayList<PartitionStats>();
@@ -195,14 +194,15 @@ public class BspServiceMaster<I extends WritableComparable,
       String serverPortList,
       int sessionMsecTimeout,
       Mapper<?, ?, ?, ?>.Context context,
-      GraphTaskManager<I, V, E, M> graphTaskManager) {
+      GraphTaskManager<I, V, E> graphTaskManager) {
     super(serverPortList, sessionMsecTimeout, context, graphTaskManager);
     workerWroteCheckpoint = new PredicateLock(context);
     registerBspEvent(workerWroteCheckpoint);
     superstepStateChanged = new PredicateLock(context);
     registerBspEvent(superstepStateChanged);
 
-    ImmutableClassesGiraphConfiguration<I, V, E, M> conf = getConfiguration();
+    ImmutableClassesGiraphConfiguration<I, V, E> conf =
+        getConfiguration();
 
     maxWorkers = conf.getMaxWorkers();
     minWorkers = conf.getMinWorkers();
@@ -743,6 +743,9 @@ public class BspServiceMaster<I extends WritableComparable,
     GlobalStats globalStats = new GlobalStats();
     globalStats.readFields(finalizedStream);
     updateCounters(globalStats);
+    SuperstepClasses superstepClasses = new SuperstepClasses();
+    superstepClasses.readFields(finalizedStream);
+    getConfiguration().updateSuperstepClasses(superstepClasses);
     int prefixFileCount = finalizedStream.readInt();
     for (int i = 0; i < prefixFileCount; ++i) {
       String metadataFilePath =
@@ -856,10 +859,11 @@ public class BspServiceMaster<I extends WritableComparable,
         if (masterChildArr.get(0).equals(myBid)) {
           GiraphStats.getInstance().getCurrentMasterTaskPartition().
               setValue(getTaskPartition());
-          masterCompute = getConfiguration().createMasterCompute();
           aggregatorHandler = new MasterAggregatorHandler(getConfiguration(),
               getContext());
           aggregatorHandler.initialize(this);
+          masterCompute = getConfiguration().createMasterCompute();
+          masterCompute.setMasterAggregatorUsage(aggregatorHandler);
 
           masterInfo = new MasterInfo();
           masterServer =
@@ -1006,6 +1010,7 @@ public class BspServiceMaster<I extends WritableComparable,
 
     // Format:
     // <global statistics>
+    // <superstep classes>
     // <number of files>
     // <used file prefix 0><used file prefix 1>...
     // <aggregator data>
@@ -1046,7 +1051,7 @@ public class BspServiceMaster<I extends WritableComparable,
   private void assignPartitionOwners(
       List<PartitionStats> allPartitionStatsList,
       List<WorkerInfo> chosenWorkerInfoList,
-      MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner) {
+      MasterGraphPartitioner<I, V, E> masterGraphPartitioner) {
     Collection<PartitionOwner> partitionOwners;
     if (getSuperstep() == INPUT_SUPERSTEP ||
         getSuperstep() == getRestartedSuperstep()) {
@@ -1511,7 +1516,7 @@ public class BspServiceMaster<I extends WritableComparable,
     // Collect aggregator values, then run the master.compute() and
     // finally save the aggregator values
     aggregatorHandler.prepareSuperstep(masterClient);
-    runMasterCompute(getSuperstep());
+    SuperstepClasses superstepClasses = runMasterCompute(getSuperstep());
 
     // If the master is halted or all the vertices voted to halt and there
     // are no more messages in the system, stop the computation
@@ -1535,12 +1540,15 @@ public class BspServiceMaster<I extends WritableComparable,
       globalStats.setHaltComputation(true);
     }
 
+    superstepClasses.verifyTypesMatch(getConfiguration());
+    getConfiguration().updateSuperstepClasses(superstepClasses);
+
     // Let everyone know the aggregated application state through the
     // superstep finishing znode.
     String superstepFinishedNode =
         getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
     WritableUtils.writeToZnode(
-        getZkExt(), superstepFinishedNode, -1, globalStats);
+        getZkExt(), superstepFinishedNode, -1, globalStats, superstepClasses);
     updateCounters(globalStats);
 
     cleanUpOldSuperstep(getSuperstep() - 1);
@@ -1564,16 +1572,19 @@ public class BspServiceMaster<I extends WritableComparable,
    * Run the master.compute() class
    *
    * @param superstep superstep for which to run the master.compute()
+   * @return Superstep classes set by Master compute
    */
-  private void runMasterCompute(long superstep) {
+  private SuperstepClasses runMasterCompute(long superstep) {
     // The master.compute() should run logically before the workers, so
     // increase the superstep counter it uses by one
-    GraphState<I, V, E, M> graphState =
-        new GraphState<I, V, E, M>(superstep + 1,
-            GiraphStats.getInstance().getVertices().getValue(),
-            GiraphStats.getInstance().getEdges().getValue(),
-            getContext(), getGraphTaskManager(), null, null);
+    GraphState graphState = new GraphState(superstep + 1,
+        GiraphStats.getInstance().getVertices().getValue(),
+        GiraphStats.getInstance().getEdges().getValue(),
+        getContext());
+    SuperstepClasses superstepClasses =
+        new SuperstepClasses(getConfiguration());
     masterCompute.setGraphState(graphState);
+    masterCompute.setSuperstepClasses(superstepClasses);
     if (superstep == INPUT_SUPERSTEP) {
       try {
         masterCompute.initialize();
@@ -1590,6 +1601,7 @@ public class BspServiceMaster<I extends WritableComparable,
     GiraphTimerContext timerContext = masterComputeTimer.time();
     masterCompute.compute();
     timerContext.stop();
+    return superstepClasses;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
index f769c3a..0537915 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
@@ -59,7 +59,7 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage,
   /** Progressable used to report progress */
   private final Progressable progressable;
   /** Giraph configuration */
-  private final ImmutableClassesGiraphConfiguration<?, ?, ?, ?> conf;
+  private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
 
   /**
    * Constructor
@@ -68,7 +68,7 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage,
    * @param progressable Progressable used for reporting progress
    */
   public MasterAggregatorHandler(
-      ImmutableClassesGiraphConfiguration<?, ?, ?, ?> conf,
+      ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
       Progressable progressable) {
     this.conf = conf;
     this.progressable = progressable;

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
index 1c36e03..1f244bb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
@@ -18,9 +18,10 @@
 
 package org.apache.giraph.master;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.GraphState;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -36,20 +37,19 @@ import org.apache.hadoop.mapreduce.Mapper;
  * master.compute() is called. This means aggregator values used by the workers
  * are consistent with aggregator values from the master from the same
  * superstep and aggregator used by the master are consistent with aggregator
- * values from the workers from the previous superstep. Note that the master
- * has to register its own aggregators (it does not call {@link WorkerContext}
- * functions), but it uses all aggregators by default, so useAggregator does
- * not have to be called.
+ * values from the workers from the previous superstep.
  */
-@SuppressWarnings("rawtypes")
-public abstract class MasterCompute implements MasterAggregatorUsage, Writable,
-    ImmutableClassesGiraphConfigurable {
+public abstract class MasterCompute
+    extends DefaultImmutableClassesGiraphConfigurable
+    implements MasterAggregatorUsage, Writable {
   /** If true, do not do anymore computation on this vertex. */
   private boolean halt = false;
-  /** Global graph state **/
+  /** Master aggregator usage */
+  private MasterAggregatorUsage masterAggregatorUsage;
+  /** Graph state */
   private GraphState graphState;
-  /** Configuration */
-  private ImmutableClassesGiraphConfiguration conf;
+  /** Computation and Combiner class used, which can be switched by master */
+  private SuperstepClasses superstepClasses;
 
   /**
    * Must be defined by user to specify what the master has to do.
@@ -68,8 +68,8 @@ public abstract class MasterCompute implements MasterAggregatorUsage, Writable,
    *
    * @return Current superstep
    */
-  public long getSuperstep() {
-    return getGraphState().getSuperstep();
+  public final long getSuperstep() {
+    return graphState.getSuperstep();
   }
 
   /**
@@ -78,8 +78,8 @@ public abstract class MasterCompute implements MasterAggregatorUsage, Writable,
    *
    * @return Total number of vertices (-1 if first superstep)
    */
-  public long getTotalNumVertices() {
-    return getGraphState().getTotalNumVertices();
+  public final long getTotalNumVertices() {
+    return graphState.getTotalNumVertices();
   }
 
   /**
@@ -88,15 +88,15 @@ public abstract class MasterCompute implements MasterAggregatorUsage, Writable,
    *
    * @return Total number of edges (-1 if first superstep)
    */
-  public long getTotalNumEdges() {
-    return getGraphState().getTotalNumEdges();
+  public final long getTotalNumEdges() {
+    return graphState.getTotalNumEdges();
   }
 
   /**
    * After this is called, the computation will stop, even if there are
    * still messages in the system or vertices that have not voted to halt.
    */
-  public void haltComputation() {
+  public final void haltComputation() {
     halt = true;
   }
 
@@ -105,43 +105,43 @@ public abstract class MasterCompute implements MasterAggregatorUsage, Writable,
    *
    * @return True if halted, false otherwise.
    */
-  public boolean isHalted() {
+  public final boolean isHalted() {
     return halt;
   }
 
   /**
-   * Get the graph state for all workers.
+   * Get the mapper context
    *
-   * @return Graph state for all workers
+   * @return Mapper context
    */
-  GraphState getGraphState() {
-    return graphState;
+  public final Mapper.Context getContext() {
+    return graphState.getContext();
   }
 
   /**
-   * Set the graph state for all workers
+   * Set Computation class to be used
    *
-   * @param graphState Graph state for all workers
+   * @param computationClass Computation class
    */
-  void setGraphState(GraphState graphState) {
-    this.graphState = graphState;
+  public final void setComputation(
+      Class<? extends Computation> computationClass) {
+    superstepClasses.setComputationClass(computationClass);
   }
 
   /**
-   * Get the mapper context
+   * Set Combiner class to be used
    *
-   * @return Mapper context
+   * @param combinerClass Combiner class
    */
-  public Mapper.Context getContext() {
-    return getGraphState().getContext();
+  public final void setCombiner(Class<? extends Combiner> combinerClass) {
+    superstepClasses.setCombinerClass(combinerClass);
   }
 
   @Override
   public final <A extends Writable> boolean registerAggregator(
     String name, Class<? extends Aggregator<A>> aggregatorClass)
     throws InstantiationException, IllegalAccessException {
-    return getGraphState().getGraphTaskManager().getMasterAggregatorUsage().
-        registerAggregator(name, aggregatorClass);
+    return masterAggregatorUsage.registerAggregator(name, aggregatorClass);
   }
 
   @Override
@@ -149,29 +149,31 @@ public abstract class MasterCompute implements MasterAggregatorUsage, Writable,
       String name,
       Class<? extends Aggregator<A>> aggregatorClass) throws
       InstantiationException, IllegalAccessException {
-    return getGraphState().getGraphTaskManager().getMasterAggregatorUsage().
-        registerPersistentAggregator(name, aggregatorClass);
+    return masterAggregatorUsage.registerPersistentAggregator(
+        name, aggregatorClass);
   }
 
   @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    return getGraphState().getGraphTaskManager().getMasterAggregatorUsage().
-        <A>getAggregatedValue(name);
+  public final <A extends Writable> A getAggregatedValue(String name) {
+    return masterAggregatorUsage.<A>getAggregatedValue(name);
   }
 
   @Override
-  public <A extends Writable> void setAggregatedValue(String name, A value) {
-    getGraphState().getGraphTaskManager().getMasterAggregatorUsage().
-        setAggregatedValue(name, value);
+  public final <A extends Writable> void setAggregatedValue(
+      String name, A value) {
+    masterAggregatorUsage.setAggregatedValue(name, value);
   }
 
-  @Override
-  public ImmutableClassesGiraphConfiguration getConf() {
-    return conf;
+  final void setGraphState(GraphState graphState) {
+    this.graphState = graphState;
   }
 
-  @Override
-  public void setConf(ImmutableClassesGiraphConfiguration conf) {
-    this.conf = conf;
+  final void setMasterAggregatorUsage(MasterAggregatorUsage
+      masterAggregatorUsage) {
+    this.masterAggregatorUsage = masterAggregatorUsage;
+  }
+
+  final void setSuperstepClasses(SuperstepClasses superstepClasses) {
+    this.superstepClasses = superstepClasses;
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
index ba2f8eb..e8eeeed 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
@@ -43,17 +43,16 @@ import static org.apache.giraph.conf.GiraphConstants.USE_SUPERSTEP_COUNTERS;
  * @param <I> Vertex id
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public class MasterThread<I extends WritableComparable, V extends Writable,
-    E extends Writable, M extends Writable> extends Thread {
+    E extends Writable> extends Thread {
   /** Counter group name for the Giraph timers */
   public static final String GIRAPH_TIMERS_COUNTER_GROUP_NAME = "Giraph Timers";
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(MasterThread.class);
   /** Reference to shared BspService */
-  private CentralizedServiceMaster<I, V, E, M> bspServiceMaster = null;
+  private CentralizedServiceMaster<I, V, E> bspServiceMaster = null;
   /** Context (for counters) */
   private final Context context;
   /** Use superstep counters? */
@@ -71,7 +70,7 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
    *        been called.
    * @param context Context from the Mapper.
    */
-  public MasterThread(CentralizedServiceMaster<I, V, E, M> bspServiceMaster,
+  public MasterThread(CentralizedServiceMaster<I, V, E> bspServiceMaster,
       Context context) {
     super(MasterThread.class.getName());
     this.bspServiceMaster = bspServiceMaster;

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
new file mode 100644
index 0000000..a12ef58
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master;
+
+import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Computation;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Modifier;
+import java.util.List;
+
+/**
+ * Holds Computation and Combiner class.
+ */
+public class SuperstepClasses implements Writable {
+  /** Computation class to be used in the following superstep */
+  private Class<? extends Computation> computationClass;
+  /** Combiner class to be used in the following superstep */
+  private Class<? extends Combiner> combinerClass;
+
+  /**
+   * Default constructor
+   */
+  public SuperstepClasses() {
+  }
+
+  /**
+   * Constructor
+   *
+   * @param conf Configuration
+   */
+  @SuppressWarnings("unchecked")
+  public SuperstepClasses(ImmutableClassesGiraphConfiguration conf) {
+    this(conf.getComputationClass(), conf.getCombinerClass());
+  }
+
+  /**
+   * Constructor
+   *
+   * @param computationClass Computation class
+   * @param combinerClass Combiner class
+   */
+  public SuperstepClasses(Class<? extends Computation> computationClass,
+      Class<? extends Combiner> combinerClass) {
+    this.computationClass = computationClass;
+    this.combinerClass = combinerClass;
+  }
+
+  public Class<? extends Computation> getComputationClass() {
+    return computationClass;
+  }
+
+  public Class<? extends Combiner> getCombinerClass() {
+    return combinerClass;
+  }
+
+  public void setComputationClass(
+      Class<? extends Computation> computationClass) {
+    this.computationClass = computationClass;
+  }
+
+  public void setCombinerClass(Class<? extends Combiner> combinerClass) {
+    this.combinerClass = combinerClass;
+  }
+
+  /**
+   * Verify that types of current Computation and Combiner are valid. If types
+   * don't match an {@link IllegalStateException} will be thrown.
+   *
+   * @param conf Configuration to verify this with
+   */
+  public void verifyTypesMatch(ImmutableClassesGiraphConfiguration conf) {
+    List<Class<?>> computationTypes = ReflectionUtils.getTypeArguments(
+        Computation.class, computationClass);
+    verifyTypes(conf.getVertexIdClass(), computationTypes.get(0),
+        "Vertex id", computationClass);
+    verifyTypes(conf.getVertexValueClass(), computationTypes.get(1),
+        "Vertex value", computationClass);
+    verifyTypes(conf.getEdgeValueClass(), computationTypes.get(2),
+        "Edge value", computationClass);
+    verifyTypes(conf.getOutgoingMessageValueClass(), computationTypes.get(3),
+        "Previous outgoing and new incoming message", computationClass);
+    Class<?> outgoingMessageType = computationTypes.get(4);
+    if (outgoingMessageType.isInterface()) {
+      throw new IllegalStateException("verifyTypesMatch: " +
+          "Message type must be concrete class " + outgoingMessageType);
+    }
+    if (Modifier.isAbstract(outgoingMessageType.getModifiers())) {
+      throw new IllegalStateException("verifyTypesMatch: " +
+          "Message type can't be abstract class" + outgoingMessageType);
+    }
+    if (combinerClass != null) {
+      List<Class<?>> combinerTypes = ReflectionUtils.getTypeArguments(
+          Combiner.class, combinerClass);
+      verifyTypes(conf.getVertexIdClass(), combinerTypes.get(0),
+          "Vertex id", combinerClass);
+      verifyTypes(outgoingMessageType, combinerTypes.get(1),
+          "Outgoing message", combinerClass);
+    }
+  }
+
+  /**
+   * Verify that found type matches the expected type. If types don't match an
+   * {@link IllegalStateException} will be thrown.
+   *
+   * @param expected Expected type
+   * @param actual Actual type
+   * @param typeDesc String description of the type (for exception description)
+   * @param mainClass Class in which the actual type was found (for exception
+   *                  description)
+   */
+  private void verifyTypes(Class<?> expected, Class<?> actual,
+      String typeDesc, Class<?> mainClass) {
+    if (!expected.equals(actual)) {
+      throw new IllegalStateException("verifyTypes: " + typeDesc + " types " +
+          "don't match, in " + mainClass.getName() + " " + expected +
+          " expected, but " + actual + " found");
+    }
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    WritableUtils.writeClass(computationClass, output);
+    WritableUtils.writeClass(combinerClass, output);
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    computationClass = WritableUtils.readClass(input);
+    combinerClass = WritableUtils.readClass(input);
+  }
+
+  @Override
+  public String toString() {
+    return "(computation=" + computationClass.getName() + ",combiner=" +
+        ((combinerClass == null) ? "null" : combinerClass.getName()) + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java
index dc9192e..f2b8552 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java
@@ -34,35 +34,31 @@ import java.io.IOException;
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message data
  */
 public abstract class BasicPartition<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    implements Partition<I, V, E, M> {
+    V extends Writable, E extends Writable>
+    implements Partition<I, V, E> {
   /** Configuration from the worker */
-  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  private ImmutableClassesGiraphConfiguration<I, V, E> conf;
   /** Partition id */
   private int id;
   /** Context used to report progress */
   private Progressable progressable;
-  /** Partition context */
-  private PartitionContext partitionContext;
 
   @Override
   public void initialize(int partitionId, Progressable progressable) {
     setId(partitionId);
     setProgressable(progressable);
-    partitionContext = conf.createPartitionContext();
   }
 
   @Override
   public void setConf(
-      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) {
+      ImmutableClassesGiraphConfiguration<I, V, E> configuration) {
     conf = configuration;
   }
 
   @Override
-  public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+  public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
     return conf;
   }
 
@@ -77,11 +73,6 @@ public abstract class BasicPartition<I extends WritableComparable,
   }
 
   @Override
-  public PartitionContext getPartitionContext() {
-    return partitionContext;
-  }
-
-  @Override
   public void progress() {
     if (progressable != null) {
       progressable.progress();
@@ -101,6 +92,5 @@ public abstract class BasicPartition<I extends WritableComparable,
   @Override
   public void readFields(DataInput input) throws IOException {
     id = input.readInt();
-    partitionContext = conf.createPartitionContext();
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
index c4669d3..6eaa6d7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
@@ -42,12 +42,11 @@ import java.util.concurrent.ConcurrentMap;
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message data
  */
 public class ByteArrayPartition<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends BasicPartition<I, V, E, M>
-    implements ReusesObjectsPartition<I, V, E, M> {
+    V extends Writable, E extends Writable>
+    extends BasicPartition<I, V, E>
+    implements ReusesObjectsPartition<I, V, E> {
   /**
    * Vertex map for this range (keyed by index).  Note that the byte[] is a
    * serialized vertex with the first four bytes as the length of the vertex
@@ -55,7 +54,7 @@ public class ByteArrayPartition<I extends WritableComparable,
    */
   private ConcurrentMap<I, byte[]> vertexMap;
   /** Representative vertex */
-  private Vertex<I, V, E, M> representativeVertex;
+  private Vertex<I, V, E> representativeVertex;
   /** Use unsafe serialization */
   private boolean useUnsafeSerialization;
 
@@ -78,7 +77,7 @@ public class ByteArrayPartition<I extends WritableComparable,
   }
 
   @Override
-  public Vertex<I, V, E, M> getVertex(I vertexIndex) {
+  public Vertex<I, V, E> getVertex(I vertexIndex) {
     byte[] vertexData = vertexMap.get(vertexIndex);
     if (vertexData == null) {
       return null;
@@ -89,7 +88,7 @@ public class ByteArrayPartition<I extends WritableComparable,
   }
 
   @Override
-  public Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex) {
+  public Vertex<I, V, E> putVertex(Vertex<I, V, E> vertex) {
     byte[] vertexData =
         WritableUtils.writeVertexToByteArray(
             vertex, useUnsafeSerialization, getConf());
@@ -104,7 +103,7 @@ public class ByteArrayPartition<I extends WritableComparable,
   }
 
   @Override
-  public Vertex<I, V, E, M> removeVertex(I vertexIndex) {
+  public Vertex<I, V, E> removeVertex(I vertexIndex) {
     byte[] vertexBytes = vertexMap.remove(vertexIndex);
     if (vertexBytes == null) {
       return null;
@@ -115,15 +114,15 @@ public class ByteArrayPartition<I extends WritableComparable,
   }
 
   @Override
-  public void addPartition(Partition<I, V, E, M> partition) {
+  public void addPartition(Partition<I, V, E> partition) {
     // Only work with other ByteArrayPartition instances
     if (!(partition instanceof ByteArrayPartition)) {
       throw new IllegalStateException("addPartition: Cannot add partition " +
           "of type " + partition.getClass());
     }
 
-    ByteArrayPartition<I, V, E, M> byteArrayPartition =
-        (ByteArrayPartition<I, V, E, M>) partition;
+    ByteArrayPartition<I, V, E> byteArrayPartition =
+        (ByteArrayPartition<I, V, E>) partition;
     for (Map.Entry<I, byte[]> entry :
         byteArrayPartition.vertexMap.entrySet()) {
       vertexMap.put(entry.getKey(), entry.getValue());
@@ -147,7 +146,7 @@ public class ByteArrayPartition<I extends WritableComparable,
   }
 
   @Override
-  public void saveVertex(Vertex<I, V, E, M> vertex) {
+  public void saveVertex(Vertex<I, V, E> vertex) {
     // Reuse the old buffer whenever possible
     byte[] oldVertexData = vertexMap.get(vertex.getId());
     if (oldVertexData != null) {
@@ -211,7 +210,7 @@ public class ByteArrayPartition<I extends WritableComparable,
   }
 
   @Override
-  public Iterator<Vertex<I, V, E, M>> iterator() {
+  public Iterator<Vertex<I, V, E>> iterator() {
     return new RepresentativeVertexIterator();
   }
 
@@ -220,7 +219,7 @@ public class ByteArrayPartition<I extends WritableComparable,
    * the same representative vertex object.
    */
   private class RepresentativeVertexIterator implements
-      Iterator<Vertex<I, V, E, M>> {
+      Iterator<Vertex<I, V, E>> {
     /** Iterator to the vertex values */
     private Iterator<byte[]> vertexDataIterator =
         vertexMap.values().iterator();
@@ -231,7 +230,7 @@ public class ByteArrayPartition<I extends WritableComparable,
     }
 
     @Override
-    public Vertex<I, V, E, M> next() {
+    public Vertex<I, V, E> next() {
       WritableUtils.reinitializeVertexFromByteArray(
           vertexDataIterator.next(), representativeVertex,
           useUnsafeSerialization, getConf());

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/DefaultPartitionContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DefaultPartitionContext.java b/giraph-core/src/main/java/org/apache/giraph/partition/DefaultPartitionContext.java
deleted file mode 100644
index c22c802..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/DefaultPartitionContext.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import org.apache.giraph.worker.WorkerContext;
-
-/**
- * Empty implementation of {@link PartitionContext}
- */
-public class DefaultPartitionContext implements PartitionContext {
-  @Override
-  public void preSuperstep(WorkerContext workerContext) {
-  }
-
-  @Override
-  public void postSuperstep(WorkerContext workerContext) {
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
index a4739f1..dadce0f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
@@ -69,12 +69,11 @@ import static org.apache.giraph.conf.GiraphConstants.PARTITIONS_DIRECTORY;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public class DiskBackedPartitionStore<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends PartitionStore<I, V, E, M> {
+    V extends Writable, E extends Writable>
+    extends PartitionStore<I, V, E> {
   /** Class logger. */
   private static final Logger LOG =
       Logger.getLogger(DiskBackedPartitionStore.class);
@@ -92,9 +91,9 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   /** Partitions' states store */
   private final Map<Integer, State> states = Maps.newHashMap();
   /** Current active partitions, which have not been put back yet */
-  private final Map<Integer, Partition<I, V, E, M>> active = Maps.newHashMap();
+  private final Map<Integer, Partition<I, V, E>> active = Maps.newHashMap();
   /** Inactive partitions to re-activate or spill to disk to make space */
-  private final Map<Integer, Partition<I, V, E, M>> inactive =
+  private final Map<Integer, Partition<I, V, E>> inactive =
       Maps.newLinkedHashMap();
   /** Ids of partitions stored on disk and number of vertices contained */
   private final Map<Integer, Integer> onDisk = Maps.newHashMap();
@@ -110,7 +109,8 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   /** Executors for users requests. Uses caller threads */
   private final ExecutorService pool = new DirectExecutorService();
   /** Giraph configuration */
-  private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  private final
+  ImmutableClassesGiraphConfiguration<I, V, E> conf;
   /** Mapper context */
   private final Context context;
   /** Base path where the partition files are written to */
@@ -129,7 +129,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
    * @param context Context
    */
   public DiskBackedPartitionStore(
-      ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
+      ImmutableClassesGiraphConfiguration<I, V, E> conf,
       Mapper<?, ?, ?, ?>.Context context) {
     this.conf = conf;
     this.context = context;
@@ -222,7 +222,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   }
 
   @Override
-  public Partition<I, V, E, M> getPartition(Integer id) {
+  public Partition<I, V, E> getPartition(Integer id) {
     try {
       return pool.submit(new GetPartition(id)).get();
     } catch (InterruptedException e) {
@@ -235,7 +235,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   }
 
   @Override
-  public void putPartition(Partition<I, V, E, M> partition) {
+  public void putPartition(Partition<I, V, E> partition) {
     Integer id = partition.getId();
     try {
       pool.submit(new PutPartition(id, partition)).get();
@@ -262,8 +262,8 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   }
 
   @Override
-  public Partition<I, V, E, M> removePartition(Integer id) {
-    Partition<I, V, E, M> partition = getPartition(id);
+  public Partition<I, V, E> removePartition(Integer id) {
+    Partition<I, V, E> partition = getPartition(id);
     // we put it back, so the partition can turn INACTIVE and be deleted.
     putPartition(partition);
     deletePartition(id);
@@ -271,7 +271,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   }
 
   @Override
-  public void addPartition(Partition<I, V, E, M> partition) {
+  public void addPartition(Partition<I, V, E> partition) {
     Integer id = partition.getId();
     try {
       pool.submit(new AddPartition(partition.getId(), partition)).get();
@@ -307,11 +307,11 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
     StringBuilder sb = new StringBuilder();
     sb.append(partitionIds.toString());
     sb.append("\nActive\n");
-    for (Entry<Integer, Partition<I, V, E, M>> e : active.entrySet()) {
+    for (Entry<Integer, Partition<I, V, E>> e : active.entrySet()) {
       sb.append(e.getKey() + ":" + e.getValue() + "\n");
     }
     sb.append("Inactive\n");
-    for (Entry<Integer, Partition<I, V, E, M>> e : inactive.entrySet()) {
+    for (Entry<Integer, Partition<I, V, E>> e : inactive.entrySet()) {
       sb.append(e.getKey() + ":" + e.getValue() + "\n");
     }
     sb.append("OnDisk\n");
@@ -368,7 +368,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
    * @param vertex The vertex to serialize
    * @throws IOException
    */
-  private void writeVertexData(DataOutput output, Vertex<I, V, E, M> vertex)
+  private void writeVertexData(DataOutput output, Vertex<I, V, E> vertex)
     throws IOException {
     vertex.getId().write(output);
     vertex.getValue().write(output);
@@ -383,7 +383,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
    * @throws IOException
    */
   @SuppressWarnings("unchecked")
-  private void writeOutEdges(DataOutput output, Vertex<I, V, E, M> vertex)
+  private void writeOutEdges(DataOutput output, Vertex<I, V, E> vertex)
     throws IOException {
     vertex.getId().write(output);
     ((OutEdges<I, E>) vertex.getEdges()).write(output);
@@ -396,7 +396,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
    * @param vertex The vertex to initialize
    * @throws IOException
    */
-  private void readVertexData(DataInput in, Vertex<I, V, E, M> vertex)
+  private void readVertexData(DataInput in, Vertex<I, V, E> vertex)
     throws IOException {
     I id = conf.createVertexId();
     id.readFields(in);
@@ -419,11 +419,11 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
    * @throws IOException
    */
   @SuppressWarnings("unchecked")
-  private void readOutEdges(DataInput in, Partition<I, V, E, M> partition)
+  private void readOutEdges(DataInput in, Partition<I, V, E> partition)
     throws IOException {
     I id = conf.createVertexId();
     id.readFields(in);
-    Vertex<I, V, E, M> v = partition.getVertex(id);
+    Vertex<I, V, E> v = partition.getVertex(id);
     ((OutEdges<I, E>) v.getEdges()).readFields(in);
   }
 
@@ -437,9 +437,9 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
    * @return The partition
    * @throws IOException
    */
-  private Partition<I, V, E, M> loadPartition(Integer id, int numVertices)
+  private Partition<I, V, E> loadPartition(Integer id, int numVertices)
     throws IOException {
-    Partition<I, V, E, M> partition =
+    Partition<I, V, E> partition =
         conf.createPartition(id, context);
     File file = new File(getVerticesPath(id));
     if (LOG.isDebugEnabled()) {
@@ -451,7 +451,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
       inputStream = new DataInputStream(
           new BufferedInputStream(new FileInputStream(file)));
       for (int i = 0; i < numVertices; ++i) {
-        Vertex<I, V , E, M> vertex = conf.createVertex();
+        Vertex<I, V , E> vertex = conf.createVertex();
         readVertexData(inputStream, vertex);
         partition.putVertex(vertex);
       }
@@ -493,7 +493,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
    * @param partition The partition to offload
    * @throws IOException
    */
-  private void offloadPartition(Partition<I, V, E, M> partition)
+  private void offloadPartition(Partition<I, V, E> partition)
     throws IOException {
     File file = new File(getVerticesPath(partition.getId()));
     file.getParentFile().mkdirs();
@@ -506,7 +506,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
     try {
       outputStream = new DataOutputStream(
           new BufferedOutputStream(new FileOutputStream(file)));
-      for (Vertex<I, V, E, M> vertex : partition) {
+      for (Vertex<I, V, E> vertex : partition) {
         writeVertexData(outputStream, vertex);
       }
     } finally {
@@ -529,7 +529,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
       try {
         outputStream = new DataOutputStream(
             new BufferedOutputStream(new FileOutputStream(file)));
-        for (Vertex<I, V, E, M> vertex : partition) {
+        for (Vertex<I, V, E> vertex : partition) {
           writeOutEdges(outputStream, vertex);
         }
       } finally {
@@ -547,7 +547,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
    * @param partition The partition
    * @throws IOException
    */
-  private void addToOOCPartition(Partition<I, V, E, M> partition)
+  private void addToOOCPartition(Partition<I, V, E> partition)
     throws IOException {
     Integer id = partition.getId();
     Integer count = onDisk.get(id);
@@ -557,7 +557,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
     try {
       outputStream = new DataOutputStream(
           new BufferedOutputStream(new FileOutputStream(file, true)));
-      for (Vertex<I, V, E, M> vertex : partition) {
+      for (Vertex<I, V, E> vertex : partition) {
         writeVertexData(outputStream, vertex);
       }
     } finally {
@@ -570,7 +570,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
     try {
       outputStream = new DataOutputStream(
           new BufferedOutputStream(new FileOutputStream(file, true)));
-      for (Vertex<I, V, E, M> vertex : partition) {
+      for (Vertex<I, V, E> vertex : partition) {
         writeOutEdges(outputStream, vertex);
       }
     } finally {
@@ -627,7 +627,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   /**
    * Task that gets a partition from the store
    */
-  private class GetPartition implements Callable<Partition<I, V, E, M>> {
+  private class GetPartition implements Callable<Partition<I, V, E>> {
     /** Partition id */
     private Integer id;
 
@@ -645,17 +645,17 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
      *
      * @return The last recently used entry.
      */
-    private Entry<Integer, Partition<I, V, E, M>> getLRUEntry() {
-      Iterator<Entry<Integer, Partition<I, V, E, M>>> i =
+    private Entry<Integer, Partition<I, V, E>> getLRUEntry() {
+      Iterator<Entry<Integer, Partition<I, V, E>>> i =
           inactive.entrySet().iterator();
-      Entry<Integer, Partition<I, V, E, M>> lruEntry = i.next();
+      Entry<Integer, Partition<I, V, E>> lruEntry = i.next();
       i.remove();
       return lruEntry;
     }
 
     @Override
-    public Partition<I, V, E, M> call() throws Exception {
-      Partition<I, V, E, M> partition = null;
+    public Partition<I, V, E> call() throws Exception {
+      Partition<I, V, E> partition = null;
 
       while (partition == null) {
         wLock.lock();
@@ -663,7 +663,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
           State pState = states.get(id);
           switch (pState) {
           case ONDISK:
-            Entry<Integer, Partition<I, V, E, M>> lru = null;
+            Entry<Integer, Partition<I, V, E>> lru = null;
             states.put(id, State.LOADING);
             int numVertices = onDisk.remove(id);
             /*
@@ -747,7 +747,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
      * @param id The partition id
      * @param partition The partition
      */
-    public PutPartition(Integer id, Partition<I, V, E, M> partition) {
+    public PutPartition(Integer id, Partition<I, V, E> partition) {
       this.id = id;
     }
 
@@ -775,7 +775,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
     /** Partition id */
     private Integer id;
     /** Partition */
-    private Partition<I, V, E, M> partition;
+    private Partition<I, V, E> partition;
 
     /**
      * Constructor
@@ -783,7 +783,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
      * @param id The partition id
      * @param partition The partition
      */
-    public AddPartition(Integer id, Partition<I, V, E, M> partition) {
+    public AddPartition(Integer id, Partition<I, V, E> partition) {
       this.id = id;
       this.partition = partition;
     }
@@ -794,7 +794,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
       wLock.lock();
       try {
         if (partitionIds.contains(id)) {
-          Partition<I, V, E, M> existing = null;
+          Partition<I, V, E> existing = null;
           boolean isOOC = false;
           boolean done  = false;
           while (!done) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
index e2e04dd..4200d79 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
@@ -28,11 +28,10 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message value
  */
 @SuppressWarnings("rawtypes")
 public interface GraphPartitionerFactory<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> extends
+    V extends Writable, E extends Writable> extends
     ImmutableClassesGiraphConfigurable {
   /**
    * Create the {@link MasterGraphPartitioner} used by the master.
@@ -40,7 +39,7 @@ public interface GraphPartitionerFactory<I extends WritableComparable,
    *
    * @return Instantiated master graph partitioner
    */
-  MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner();
+  MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner();
 
   /**
    * Create the {@link WorkerGraphPartitioner} used by the worker.
@@ -48,5 +47,5 @@ public interface GraphPartitionerFactory<I extends WritableComparable,
    *
    * @return Instantiated worker graph partitioner
    */
-  WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner();
+  WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
index 5faf367..240687e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
@@ -35,12 +35,11 @@ import java.util.List;
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message value
  */
 @SuppressWarnings("rawtypes")
 public class HashMasterPartitioner<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> implements
-    MasterGraphPartitioner<I, V, E, M> {
+    V extends Writable, E extends Writable> implements
+    MasterGraphPartitioner<I, V, E> {
   /** Class logger */
   private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
   /** Provided configuration */

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
index f7343a1..7cc5651 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
@@ -29,23 +29,22 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message value
  */
 @SuppressWarnings("rawtypes")
 public class HashPartitionerFactory<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    implements GraphPartitionerFactory<I, V, E, M> {
+    V extends Writable, E extends Writable>
+    implements GraphPartitionerFactory<I, V, E> {
   /** Saved configuration */
   private ImmutableClassesGiraphConfiguration conf;
 
   @Override
-  public MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner() {
-    return new HashMasterPartitioner<I, V, E, M>(getConf());
+  public MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner() {
+    return new HashMasterPartitioner<I, V, E>(getConf());
   }
 
   @Override
-  public WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner() {
-    return new HashWorkerPartitioner<I, V, E, M>();
+  public WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner() {
+    return new HashWorkerPartitioner<I, V, E>();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
index 227e234..1eeece7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
@@ -29,23 +29,22 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message value
  */
 @SuppressWarnings("rawtypes")
 public class HashRangePartitionerFactory<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    implements GraphPartitionerFactory<I, V, E, M> {
+    V extends Writable, E extends Writable>
+    implements GraphPartitionerFactory<I, V, E> {
   /** Saved configuration */
-  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  private ImmutableClassesGiraphConfiguration<I, V, E> conf;
 
   @Override
-  public MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner() {
-    return new HashMasterPartitioner<I, V, E, M>(getConf());
+  public MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner() {
+    return new HashMasterPartitioner<I, V, E>(getConf());
   }
 
   @Override
-  public WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner() {
-    return new HashRangeWorkerPartitioner<I, V, E, M>();
+  public WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner() {
+    return new HashRangeWorkerPartitioner<I, V, E>();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/HashRangeWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangeWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashRangeWorkerPartitioner.java
index a6e764d..81c3d7d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangeWorkerPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashRangeWorkerPartitioner.java
@@ -29,12 +29,11 @@ import com.google.common.primitives.UnsignedInts;
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message value
  */
 @SuppressWarnings("rawtypes")
 public class HashRangeWorkerPartitioner<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends HashWorkerPartitioner<I, V, E, M> {
+    V extends Writable, E extends Writable>
+    extends HashWorkerPartitioner<I, V, E> {
   /** A transformed hashCode() must be strictly smaller than this. */
   private static final long HASH_LIMIT = 2L * Integer.MAX_VALUE + 2L;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
index 599ea0c..d833895 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
@@ -32,12 +32,11 @@ import java.util.List;
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message value
  */
 @SuppressWarnings("rawtypes")
 public class HashWorkerPartitioner<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    implements WorkerGraphPartitioner<I, V, E, M> {
+    V extends Writable, E extends Writable>
+    implements WorkerGraphPartitioner<I, V, E> {
   /**
    * Mapping of the vertex ids to {@link PartitionOwner}.
    */
@@ -58,7 +57,7 @@ public class HashWorkerPartitioner<I extends WritableComparable,
   @Override
   public Collection<PartitionStats> finalizePartitionStats(
       Collection<PartitionStats> workerPartitionStats,
-      PartitionStore<I, V, E, M> partitionStore) {
+      PartitionStore<I, V, E> partitionStore) {
     // No modification necessary
     return workerPartitionStats;
   }
@@ -67,7 +66,7 @@ public class HashWorkerPartitioner<I extends WritableComparable,
   public PartitionExchange updatePartitionOwners(
       WorkerInfo myWorkerInfo,
       Collection<? extends PartitionOwner> masterSetPartitionOwners,
-      PartitionStore<I, V, E, M> partitionStore) {
+      PartitionStore<I, V, E> partitionStore) {
     return PartitionBalancer.updatePartitionOwners(partitionOwnerList,
         myWorkerInfo, masterSetPartitionOwners, partitionStore);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java
index 130ee07..50c750a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java
@@ -31,11 +31,10 @@ import org.apache.giraph.worker.WorkerInfo;
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message value
  */
 @SuppressWarnings("rawtypes")
 public interface MasterGraphPartitioner<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
+    V extends Writable, E extends Writable> {
   /**
    * Set some initial partition owners for the graph. Guaranteed to be called
    * prior to the graph being loaded (initial or restart).

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java b/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
index 1ca0b61..aebd343 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
@@ -32,13 +32,12 @@ import org.apache.hadoop.util.Progressable;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public interface Partition<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends Writable, ImmutableClassesGiraphConfigurable<I, V, E, M>,
-    Iterable<Vertex<I, V, E, M>> {
+    V extends Writable, E extends Writable>
+    extends Writable, ImmutableClassesGiraphConfigurable<I, V, E>,
+    Iterable<Vertex<I, V, E>> {
   /**
    * Initialize the partition.  Guaranteed to be called before used.
    *
@@ -53,7 +52,7 @@ public interface Partition<I extends WritableComparable,
    * @param vertexIndex Vertex index to search for
    * @return Vertex if it exists, null otherwise
    */
-  Vertex<I, V, E, M> getVertex(I vertexIndex);
+  Vertex<I, V, E> getVertex(I vertexIndex);
 
   /**
    * Put a vertex into the Partition
@@ -61,7 +60,7 @@ public interface Partition<I extends WritableComparable,
    * @param vertex Vertex to put in the Partition
    * @return old vertex value (i.e. null if none existed prior)
    */
-  Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex);
+  Vertex<I, V, E> putVertex(Vertex<I, V, E> vertex);
 
   /**
    * Remove a vertex from the Partition
@@ -69,14 +68,14 @@ public interface Partition<I extends WritableComparable,
    * @param vertexIndex Vertex index to remove
    * @return The removed vertex.
    */
-  Vertex<I, V, E, M> removeVertex(I vertexIndex);
+  Vertex<I, V, E> removeVertex(I vertexIndex);
 
   /**
    * Add a partition's vertices
    *
    * @param partition Partition to add
    */
-  void addPartition(Partition<I, V, E, M> partition);
+  void addPartition(Partition<I, V, E> partition);
 
   /**
    * Get the number of vertices in this partition
@@ -123,12 +122,5 @@ public interface Partition<I extends WritableComparable,
    *
    * @param vertex Vertex to save
    */
-  void saveVertex(Vertex<I, V, E, M> vertex);
-
-  /**
-   * Get partition context
-   *
-   * @return Partition context
-   */
-  PartitionContext getPartitionContext();
+  void saveVertex(Vertex<I, V, E> vertex);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/PartitionContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionContext.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionContext.java
deleted file mode 100644
index 412f6e3..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionContext.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import org.apache.giraph.worker.WorkerContext;
-
-/**
- * PartitionContext allows for the execution of user code
- * on a per-partition basis. There's one PartitionContext per partition.
- */
-public interface PartitionContext {
-  /**
-   * Execute user code.
-   * This method is executed once for each partition before computation for
-   * that partition starts.
-   *
-   * @param workerContext Worker context
-   */
-  void preSuperstep(WorkerContext workerContext);
-
-  /**
-   * Execute user code.
-   * This method is executed once on for each partition after computation in
-   * current superstep for that partition ends.
-   *
-   * @param workerContext Worker context
-   */
-  void postSuperstep(WorkerContext workerContext);
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
index 4206ce3..763397e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
@@ -27,10 +27,9 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 public abstract class PartitionStore<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
+    V extends Writable, E extends Writable> {
 
   /**
    * Add a new partition to the store or just the vertices from the partition
@@ -38,16 +37,16 @@ public abstract class PartitionStore<I extends WritableComparable,
    *
    * @param partition Partition to add
    */
-  public abstract void addPartition(Partition<I, V, E, M> partition);
+  public abstract void addPartition(Partition<I, V, E> partition);
 
   /**
    * Get a partition. Note: user has to put back it to the store through
-   * {@link #putPartition(Integer, Partition)} after use.
+   * {@link #putPartition(Partition)} after use.
    *
    * @param partitionId Partition id
    * @return The requested partition
    */
-  public abstract Partition<I, V, E, M> getPartition(Integer partitionId);
+  public abstract Partition<I, V, E> getPartition(Integer partitionId);
 
   /**
    * Put a partition back to the store. Use this method to be put a partition
@@ -55,7 +54,7 @@ public abstract class PartitionStore<I extends WritableComparable,
    *
    * @param partition Partition
    */
-  public abstract void putPartition(Partition<I, V, E, M> partition);
+  public abstract void putPartition(Partition<I, V, E> partition);
 
   /**
    * Remove a partition and return it.
@@ -63,7 +62,7 @@ public abstract class PartitionStore<I extends WritableComparable,
    * @param partitionId Partition id
    * @return The removed partition
    */
-  public abstract Partition<I, V, E, M> removePartition(Integer partitionId);
+  public abstract Partition<I, V, E> removePartition(Integer partitionId);
 
   /**
    * Just delete a partition

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/RangeMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangeMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangeMasterPartitioner.java
index f9b0329..3911a95 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/RangeMasterPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/RangeMasterPartitioner.java
@@ -30,12 +30,11 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message value
  */
 @SuppressWarnings("rawtypes")
 public abstract class RangeMasterPartitioner<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> implements
-    MasterGraphPartitioner<I, V, E, M> {
+    V extends Writable, E extends Writable> implements
+    MasterGraphPartitioner<I, V, E> {
   @Override
   public PartitionStats createPartitionStats() {
     return new RangePartitionStats<I>();

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionerFactory.java
index 29f7898..2ec4d4a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionerFactory.java
@@ -34,10 +34,9 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message value
  */
 @SuppressWarnings("rawtypes")
 public abstract class RangePartitionerFactory<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    implements GraphPartitionerFactory<I, V, E, M> {
+    V extends Writable, E extends Writable>
+    implements GraphPartitionerFactory<I, V, E> {
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/RangeSplitHint.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangeSplitHint.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangeSplitHint.java
index 9634c33..4317944 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/RangeSplitHint.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/RangeSplitHint.java
@@ -43,7 +43,7 @@ public class RangeSplitHint<I extends WritableComparable>
   /** Number of vertices in this range after the split */
   private long postSplitVertexCount;
   /** Configuration */
-  private ImmutableClassesGiraphConfiguration<I, ?, ?, ?> conf;
+  private ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
 
   @Override
   public void readFields(DataInput input) throws IOException {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/RangeWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangeWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangeWorkerPartitioner.java
index 5a494a5..cbcd753 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/RangeWorkerPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/RangeWorkerPartitioner.java
@@ -39,12 +39,11 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message value
  */
 @SuppressWarnings("rawtypes")
 public abstract class RangeWorkerPartitioner<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> implements
-    WorkerGraphPartitioner<I, V, E, M> {
+    V extends Writable, E extends Writable> implements
+    WorkerGraphPartitioner<I, V, E> {
   /** Mapping of the vertex ids to the {@link PartitionOwner} */
   protected NavigableMap<I, RangePartitionOwner<I>> vertexRangeMap =
       new TreeMap<I, RangePartitionOwner<I>>();

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/ReusesObjectsPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/ReusesObjectsPartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/ReusesObjectsPartition.java
index 4153d55..405bc5c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/ReusesObjectsPartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/ReusesObjectsPartition.java
@@ -29,8 +29,6 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <I> Vertex id
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message value
  */
 public interface ReusesObjectsPartition<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends Partition<I, V, E, M> { }
+    V extends Writable, E extends Writable> extends Partition<I, V, E> { }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
index 9ac2e11..7aee84c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
@@ -30,26 +30,25 @@ import org.apache.hadoop.io.Writable;
  *
  * @param <V> Vertex value type
  * @param <E> Edge value type
- * @param <M> Message data type
  */
 public class SimpleIntRangePartitionerFactory<V extends Writable,
-    E extends Writable, M extends Writable>
-    implements GraphPartitionerFactory<IntWritable, V, E, M> {
+    E extends Writable>
+    implements GraphPartitionerFactory<IntWritable, V, E> {
   /** Configuration. */
   private ImmutableClassesGiraphConfiguration conf;
   /** Vertex key space size. */
   private long keySpaceSize;
 
   @Override
-  public MasterGraphPartitioner<IntWritable, V, E, M>
+  public MasterGraphPartitioner<IntWritable, V, E>
   createMasterGraphPartitioner() {
-    return new SimpleRangeMasterPartitioner<IntWritable, V, E, M>(conf);
+    return new SimpleRangeMasterPartitioner<IntWritable, V, E>(conf);
   }
 
   @Override
-  public WorkerGraphPartitioner<IntWritable, V, E, M>
+  public WorkerGraphPartitioner<IntWritable, V, E>
   createWorkerGraphPartitioner() {
-    return new SimpleRangeWorkerPartitioner<IntWritable, V, E, M>(
+    return new SimpleRangeWorkerPartitioner<IntWritable, V, E>(
         keySpaceSize) {
       @Override
       protected long vertexKeyFromId(IntWritable id) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
index 5772a7b..64efde9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
@@ -30,26 +30,25 @@ import org.apache.hadoop.io.Writable;
  *
  * @param <V> Vertex value type
  * @param <E> Edge value type
- * @param <M> Message data type
  */
 public class SimpleLongRangePartitionerFactory<V extends Writable,
-    E extends Writable, M extends Writable>
-    implements GraphPartitionerFactory<LongWritable, V, E, M> {
+    E extends Writable>
+    implements GraphPartitionerFactory<LongWritable, V, E> {
   /** Configuration. */
   private ImmutableClassesGiraphConfiguration conf;
   /** Vertex key space size. */
   private long keySpaceSize;
 
   @Override
-  public MasterGraphPartitioner<LongWritable, V, E, M>
+  public MasterGraphPartitioner<LongWritable, V, E>
   createMasterGraphPartitioner() {
-    return new SimpleRangeMasterPartitioner<LongWritable, V, E, M>(conf);
+    return new SimpleRangeMasterPartitioner<LongWritable, V, E>(conf);
   }
 
   @Override
-  public WorkerGraphPartitioner<LongWritable, V, E, M>
+  public WorkerGraphPartitioner<LongWritable, V, E>
   createWorkerGraphPartitioner() {
-    return new SimpleRangeWorkerPartitioner<LongWritable, V, E, M>(
+    return new SimpleRangeWorkerPartitioner<LongWritable, V, E>(
         keySpaceSize) {
       @Override
       protected long vertexKeyFromId(LongWritable id) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
index d6a46bd..0c1b404 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
@@ -42,14 +42,13 @@ import static org.apache.giraph.conf.GiraphConstants.USE_OUT_OF_CORE_MESSAGES;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public class SimplePartition<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends BasicPartition<I, V, E, M> {
+    V extends Writable, E extends Writable>
+    extends BasicPartition<I, V, E> {
   /** Vertex map for this range (keyed by index) */
-  private ConcurrentMap<I, Vertex<I, V, E, M>> vertexMap;
+  private ConcurrentMap<I, Vertex<I, V, E>> vertexMap;
 
   /**
    * Constructor for reflection.
@@ -60,30 +59,30 @@ public class SimplePartition<I extends WritableComparable,
   public void initialize(int partitionId, Progressable progressable) {
     super.initialize(partitionId, progressable);
     if (USE_OUT_OF_CORE_MESSAGES.get(getConf())) {
-      vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
+      vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E>>();
     } else {
       vertexMap = Maps.newConcurrentMap();
     }
   }
 
   @Override
-  public Vertex<I, V, E, M> getVertex(I vertexIndex) {
+  public Vertex<I, V, E> getVertex(I vertexIndex) {
     return vertexMap.get(vertexIndex);
   }
 
   @Override
-  public Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex) {
+  public Vertex<I, V, E> putVertex(Vertex<I, V, E> vertex) {
     return vertexMap.put(vertex.getId(), vertex);
   }
 
   @Override
-  public Vertex<I, V, E, M> removeVertex(I vertexIndex) {
+  public Vertex<I, V, E> removeVertex(I vertexIndex) {
     return vertexMap.remove(vertexIndex);
   }
 
   @Override
-  public void addPartition(Partition<I, V, E, M> partition) {
-    for (Vertex<I, V, E , M> vertex : partition) {
+  public void addPartition(Partition<I, V, E> partition) {
+    for (Vertex<I, V, E> vertex : partition) {
       vertexMap.put(vertex.getId(), vertex);
     }
   }
@@ -96,14 +95,14 @@ public class SimplePartition<I extends WritableComparable,
   @Override
   public long getEdgeCount() {
     long edges = 0;
-    for (Vertex<I, V, E, M> vertex : vertexMap.values()) {
+    for (Vertex<I, V, E> vertex : vertexMap.values()) {
       edges += vertex.getNumEdges();
     }
     return edges;
   }
 
   @Override
-  public void saveVertex(Vertex<I, V, E, M> vertex) {
+  public void saveVertex(Vertex<I, V, E> vertex) {
     // No-op, vertices are stored as Java objects in this partition
   }
 
@@ -116,14 +115,14 @@ public class SimplePartition<I extends WritableComparable,
   public void readFields(DataInput input) throws IOException {
     super.readFields(input);
     if (USE_OUT_OF_CORE_MESSAGES.get(getConf())) {
-      vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
+      vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E>>();
     } else {
       vertexMap = Maps.newConcurrentMap();
     }
     int vertices = input.readInt();
     for (int i = 0; i < vertices; ++i) {
       progress();
-      Vertex<I, V, E, M> vertex =
+      Vertex<I, V, E> vertex =
           WritableUtils.readVertexFromDataInput(input, getConf());
       if (vertexMap.put(vertex.getId(), vertex) != null) {
         throw new IllegalStateException(
@@ -137,14 +136,14 @@ public class SimplePartition<I extends WritableComparable,
   public void write(DataOutput output) throws IOException {
     super.write(output);
     output.writeInt(vertexMap.size());
-    for (Vertex<I, V, E, M> vertex : vertexMap.values()) {
+    for (Vertex<I, V, E> vertex : vertexMap.values()) {
       progress();
       WritableUtils.writeVertexToDataOutput(output, vertex, getConf());
     }
   }
 
   @Override
-  public Iterator<Vertex<I, V, E, M>> iterator() {
+  public Iterator<Vertex<I, V, E>> iterator() {
     return vertexMap.values().iterator();
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
index 74cc3a7..ae17aac 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
@@ -33,16 +33,15 @@ import java.util.concurrent.ConcurrentMap;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 public class SimplePartitionStore<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends PartitionStore<I, V, E, M> {
+    V extends Writable, E extends Writable>
+    extends PartitionStore<I, V, E> {
   /** Map of stored partitions. */
-  private final ConcurrentMap<Integer, Partition<I, V, E, M>> partitions =
+  private final ConcurrentMap<Integer, Partition<I, V, E>> partitions =
       Maps.newConcurrentMap();
   /** Configuration. */
-  private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
   /** Context used to report progress */
   private final Mapper<?, ?, ?, ?>.Context context;
 
@@ -53,15 +52,15 @@ public class SimplePartitionStore<I extends WritableComparable,
    * @param context Mapper context
    */
   public SimplePartitionStore(
-      ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
+      ImmutableClassesGiraphConfiguration<I, V, E> conf,
       Mapper<?, ?, ?, ?>.Context context) {
     this.conf = conf;
     this.context = context;
   }
 
   @Override
-  public void addPartition(Partition<I, V, E, M> partition) {
-    Partition<I, V, E, M> oldPartition = partitions.get(partition.getId());
+  public void addPartition(Partition<I, V, E> partition) {
+    Partition<I, V, E> oldPartition = partitions.get(partition.getId());
     if (oldPartition == null) {
       oldPartition = partitions.putIfAbsent(partition.getId(), partition);
       if (oldPartition == null) {
@@ -72,12 +71,12 @@ public class SimplePartitionStore<I extends WritableComparable,
   }
 
   @Override
-  public Partition<I, V, E, M> getPartition(Integer partitionId) {
+  public Partition<I, V, E> getPartition(Integer partitionId) {
     return partitions.get(partitionId);
   }
 
   @Override
-  public Partition<I, V, E, M> removePartition(Integer partitionId) {
+  public Partition<I, V, E> removePartition(Integer partitionId) {
     return partitions.remove(partitionId);
   }
 
@@ -102,5 +101,5 @@ public class SimplePartitionStore<I extends WritableComparable,
   }
 
   @Override
-  public void putPartition(Partition<I, V, E, M> partition) { }
+  public void putPartition(Partition<I, V, E> partition) { }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java
index bf34ecd..37ce8c7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java
@@ -37,11 +37,10 @@ import java.util.List;
  * @param <I> Vertex id type
  * @param <V> Vertex value type
  * @param <E> Edge value type
- * @param <M> Message data type
  */
 public class SimpleRangeMasterPartitioner<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> implements
-    MasterGraphPartitioner<I, V, E, M> {
+    V extends Writable, E extends Writable> implements
+    MasterGraphPartitioner<I, V, E> {
   /** Class logger */
   private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
   /** Provided configuration */

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java
index f94c14b..ab2afd5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java
@@ -35,12 +35,10 @@ import java.util.List;
  * @param <I> Vertex id type
  * @param <V> Vertex value type
  * @param <E> Edge value type
- * @param <M> Message data type
  */
 public abstract class SimpleRangeWorkerPartitioner<I extends
-    WritableComparable, V extends Writable, E extends Writable,
-    M extends Writable>
-    implements WorkerGraphPartitioner<I, V, E, M> {
+    WritableComparable, V extends Writable, E extends Writable>
+    implements WorkerGraphPartitioner<I, V, E> {
   /** List of {@link PartitionOwner}s for this worker. */
   private List<PartitionOwner> partitionOwnerList = Lists.newArrayList();
   /** Vertex keys space size. */
@@ -88,7 +86,7 @@ public abstract class SimpleRangeWorkerPartitioner<I extends
   @Override
   public Collection<PartitionStats> finalizePartitionStats(
       Collection<PartitionStats> workerPartitionStats,
-      PartitionStore<I, V, E, M> partitionStore) {
+      PartitionStore<I, V, E> partitionStore) {
     // No modification necessary
     return workerPartitionStats;
   }
@@ -97,7 +95,7 @@ public abstract class SimpleRangeWorkerPartitioner<I extends
   public PartitionExchange updatePartitionOwners(
       WorkerInfo myWorkerInfo,
       Collection<? extends PartitionOwner> masterSetPartitionOwners,
-      PartitionStore<I, V, E, M> partitionStore) {
+      PartitionStore<I, V, E> partitionStore) {
     return PartitionBalancer.updatePartitionOwners(partitionOwnerList,
         myWorkerInfo, masterSetPartitionOwners, partitionStore);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java
index 5a78b1d..004ea81 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java
@@ -32,11 +32,10 @@ import java.util.Collection;
  * @param <I> Vertex id
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public interface WorkerGraphPartitioner<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
+    V extends Writable, E extends Writable> {
   /**
    * Instantiate the {@link PartitionOwner} implementation used to read the
    * master assignments.
@@ -67,7 +66,7 @@ public interface WorkerGraphPartitioner<I extends WritableComparable,
    */
   Collection<PartitionStats> finalizePartitionStats(
       Collection<PartitionStats> workerPartitionStats,
-      PartitionStore<I, V, E, M> partitionStore);
+      PartitionStore<I, V, E> partitionStore);
 
   /**
    * Get the partitions owners and update locally.  Returns the partitions
@@ -83,7 +82,7 @@ public interface WorkerGraphPartitioner<I extends WritableComparable,
   PartitionExchange updatePartitionOwners(
       WorkerInfo myWorkerInfo,
       Collection<? extends PartitionOwner> masterSetPartitionOwners,
-      PartitionStore<I, V, E, M> partitionStore);
+      PartitionStore<I, V, E> partitionStore);
 
   /**
    * Get a collection of the {@link PartitionOwner} objects.

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
index 2cfa661..9b3f165 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
@@ -38,7 +38,7 @@ public abstract class ByteArrayVertexIdData<I extends WritableComparable, T>
   /** Extended data output */
   private ExtendedDataOutput extendedDataOutput;
   /** Configuration */
-  private ImmutableClassesGiraphConfiguration<I, ?, ?, ?> configuration;
+  private ImmutableClassesGiraphConfiguration<I, ?, ?> configuration;
 
   /**
    * Create a new data object.
@@ -149,7 +149,7 @@ public abstract class ByteArrayVertexIdData<I extends WritableComparable, T>
   }
 
   @Override
-  public ImmutableClassesGiraphConfiguration getConf() {
+  public ImmutableClassesGiraphConfiguration<I, ?, ?> getConf() {
     return configuration;
   }