You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/05/20 19:27:08 UTC

[12/12] git commit: updated refs/heads/trunk to 8811165

GIRAPH-667: Decouple Vertex data and Computation, make Computation and Combiner classes switchable  (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/8811165e
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/8811165e
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/8811165e

Branch: refs/heads/trunk
Commit: 8811165e85ae2db442e34f1021db29f4dfcc8430
Parents: 9f7a347
Author: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Authored: Mon May 20 10:24:19 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Committed: Mon May 20 10:26:04 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                          |    3 +
 .../io/accumulo/AccumuloVertexInputFormat.java     |    6 +-
 .../io/accumulo/TestAccumuloVertexFormat.java      |   18 +-
 .../edgemarker/AccumuloEdgeInputFormat.java        |    4 +-
 .../edgemarker/AccumuloEdgeOutputFormat.java       |    2 +-
 .../giraph/benchmark/AggregatorsBenchmark.java     |   14 +-
 .../apache/giraph/benchmark/PageRankBenchmark.java |    6 +-
 .../giraph/benchmark/PageRankComputation.java      |   58 +++
 .../apache/giraph/benchmark/PageRankVertex.java    |   55 ---
 .../giraph/benchmark/RandomMessageBenchmark.java   |   23 +-
 .../giraph/benchmark/ShortestPathsBenchmark.java   |    4 +-
 .../giraph/benchmark/ShortestPathsComputation.java |   76 ++++
 .../giraph/benchmark/ShortestPathsVertex.java      |   65 ----
 .../benchmark/WeightedPageRankBenchmark.java       |    6 +-
 .../benchmark/WeightedPageRankComputation.java     |   72 ++++
 .../giraph/benchmark/WeightedPageRankVertex.java   |   66 ----
 .../java/org/apache/giraph/bsp/BspService.java     |   22 +-
 .../org/apache/giraph/bsp/CentralizedService.java  |    3 +-
 .../giraph/bsp/CentralizedServiceMaster.java       |    5 +-
 .../giraph/bsp/CentralizedServiceWorker.java       |   20 +-
 .../java/org/apache/giraph/comm/SendCache.java     |    6 +-
 .../java/org/apache/giraph/comm/SendEdgeCache.java |    2 +-
 .../org/apache/giraph/comm/SendMessageCache.java   |    5 +-
 .../org/apache/giraph/comm/SendMutationsCache.java |   37 +-
 .../org/apache/giraph/comm/SendPartitionCache.java |   22 +-
 .../java/org/apache/giraph/comm/ServerData.java    |   63 ++--
 .../java/org/apache/giraph/comm/WorkerClient.java  |    3 +-
 .../giraph/comm/WorkerClientRequestProcessor.java  |   11 +-
 .../java/org/apache/giraph/comm/WorkerServer.java  |   10 +-
 .../messages/ByteArrayMessagesPerVertexStore.java  |   29 +-
 .../comm/messages/DiskBackedMessageStore.java      |   25 +-
 .../DiskBackedMessageStoreByPartition.java         |   30 +-
 .../comm/messages/InMemoryMessageStoreFactory.java |   76 ++++
 .../giraph/comm/messages/MessageStoreFactory.java  |    8 +-
 .../comm/messages/OneMessagePerVertexStore.java    |   32 +-
 .../comm/messages/SequentialFileMessageStore.java  |   24 +-
 .../giraph/comm/messages/SimpleMessageStore.java   |   13 +-
 .../giraph/comm/netty/NettyMasterClient.java       |    4 +-
 .../giraph/comm/netty/NettyMasterServer.java       |    2 +-
 .../NettyWorkerAggregatorRequestProcessor.java     |    8 +-
 .../giraph/comm/netty/NettyWorkerClient.java       |   15 +-
 .../netty/NettyWorkerClientRequestProcessor.java   |   84 ++--
 .../giraph/comm/netty/NettyWorkerServer.java       |   78 ++---
 .../netty/handler/WorkerRequestServerHandler.java  |   18 +-
 .../SendPartitionCurrentMessagesRequest.java       |   11 +-
 .../requests/SendPartitionMutationsRequest.java    |   23 +-
 .../giraph/comm/requests/SendVertexRequest.java    |   11 +-
 .../comm/requests/SendWorkerMessagesRequest.java   |    3 +-
 .../apache/giraph/comm/requests/WorkerRequest.java |    5 +-
 .../giraph/comm/requests/WritableRequest.java      |   13 +-
 .../java/org/apache/giraph/conf/AllOptions.java    |    4 +-
 .../DefaultImmutableClassesGiraphConfigurable.java |   11 +-
 .../java/org/apache/giraph/conf/GiraphClasses.java |  176 +++++-----
 .../apache/giraph/conf/GiraphConfiguration.java    |   34 +--
 .../org/apache/giraph/conf/GiraphConstants.java    |   26 +-
 .../conf/ImmutableClassesGiraphConfigurable.java   |    9 +-
 .../conf/ImmutableClassesGiraphConfiguration.java  |  178 +++++-----
 .../apache/giraph/edge/ConfigurableOutEdges.java   |    4 +-
 .../java/org/apache/giraph/edge/EdgeStore.java     |   15 +-
 .../apache/giraph/edge/MutableEdgesIterable.java   |    4 +-
 .../apache/giraph/edge/MutableEdgesWrapper.java    |    2 +-
 .../org/apache/giraph/graph/BasicComputation.java  |   35 ++
 .../java/org/apache/giraph/graph/Computation.java  |  260 +++++++++++++
 .../org/apache/giraph/graph/ComputeCallable.java   |   80 ++---
 .../apache/giraph/graph/DefaultVertexResolver.java |   69 ++---
 .../giraph/graph/DefaultVertexValueFactory.java    |    2 +-
 .../giraph/graph/GiraphTransferRegulator.java      |    5 +-
 .../java/org/apache/giraph/graph/GraphMapper.java  |    7 +-
 .../java/org/apache/giraph/graph/GraphState.java   |   64 +---
 .../org/apache/giraph/graph/GraphStateAware.java   |   47 ---
 .../org/apache/giraph/graph/GraphTaskManager.java  |  110 +++---
 .../main/java/org/apache/giraph/graph/Vertex.java  |  193 +----------
 .../org/apache/giraph/graph/VertexChanges.java     |    5 +-
 .../org/apache/giraph/graph/VertexMutations.java   |   22 +-
 .../org/apache/giraph/graph/VertexResolver.java    |   17 +-
 .../apache/giraph/graph/VertexValueFactory.java    |    2 +-
 .../SuperstepHashPartitionerFactory.java           |   14 +-
 .../main/java/org/apache/giraph/io/EdgeReader.java |    2 +-
 .../org/apache/giraph/io/GiraphInputFormat.java    |    2 +-
 .../org/apache/giraph/io/SimpleVertexWriter.java   |    2 +-
 .../org/apache/giraph/io/VertexOutputFormat.java   |    2 +-
 .../java/org/apache/giraph/io/VertexReader.java    |    4 +-
 .../org/apache/giraph/io/VertexValueReader.java    |    4 +-
 .../java/org/apache/giraph/io/VertexWriter.java    |    2 +-
 .../giraph/io/filters/DefaultEdgeInputFilter.java  |    2 +-
 .../io/filters/DefaultVertexInputFilter.java       |    6 +-
 .../giraph/io/filters/VertexInputFilter.java       |    5 +-
 .../AdjacencyListTextVertexOutputFormat.java       |    2 +-
 .../io/formats/IdWithValueTextOutputFormat.java    |    2 +-
 .../formats/IntIntTextVertexValueInputFormat.java  |    6 +-
 .../io/formats/JsonBase64VertexOutputFormat.java   |    2 +-
 ...JsonLongDoubleFloatDoubleVertexInputFormat.java |    5 +-
 ...sonLongDoubleFloatDoubleVertexOutputFormat.java |    3 +-
 .../PseudoRandomIntNullVertexInputFormat.java      |    4 +-
 .../io/formats/PseudoRandomVertexInputFormat.java  |    4 +-
 .../io/formats/SequenceFileVertexInputFormat.java  |    6 +-
 .../io/formats/SequenceFileVertexOutputFormat.java |    2 +-
 ...DoubleDoubleAdjacencyListVertexInputFormat.java |    5 +-
 .../giraph/io/formats/TextVertexInputFormat.java   |   14 +-
 .../giraph/io/formats/TextVertexOutputFormat.java  |    2 +-
 .../io/formats/TextVertexValueInputFormat.java     |    3 +-
 .../formats/multi/EdgeInputFormatDescription.java  |   12 +-
 .../io/formats/multi/MultiEdgeInputFormat.java     |    2 +-
 .../io/formats/multi/MultiVertexInputFormat.java   |    2 +-
 .../multi/VertexInputFormatDescription.java        |   12 +-
 .../giraph/io/internal/WrappedEdgeReader.java      |    4 +-
 .../io/internal/WrappedVertexOutputFormat.java     |    4 +-
 .../giraph/io/internal/WrappedVertexReader.java    |    6 +-
 .../giraph/io/iterables/EdgeReaderWrapper.java     |    2 +-
 .../giraph/io/iterables/VertexReaderWrapper.java   |   12 +-
 .../MultiThreadedSuperstepOutput.java              |    8 +-
 .../io/superstep_output/NoOpSuperstepOutput.java   |    2 +-
 .../SynchronizedSuperstepOutput.java               |    7 +-
 .../giraph/job/GiraphConfigurationValidator.java   |   52 ++--
 .../org/apache/giraph/master/BspServiceMaster.java |   48 ++-
 .../giraph/master/MasterAggregatorHandler.java     |    4 +-
 .../org/apache/giraph/master/MasterCompute.java    |   98 +++---
 .../org/apache/giraph/master/MasterThread.java     |    7 +-
 .../org/apache/giraph/master/SuperstepClasses.java |  160 ++++++++
 .../apache/giraph/partition/BasicPartition.java    |   20 +-
 .../giraph/partition/ByteArrayPartition.java       |   29 +-
 .../giraph/partition/DefaultPartitionContext.java  |   34 --
 .../giraph/partition/DiskBackedPartitionStore.java |   78 ++--
 .../giraph/partition/GraphPartitionerFactory.java  |    7 +-
 .../giraph/partition/HashMasterPartitioner.java    |    5 +-
 .../giraph/partition/HashPartitionerFactory.java   |   13 +-
 .../partition/HashRangePartitionerFactory.java     |   15 +-
 .../partition/HashRangeWorkerPartitioner.java      |    5 +-
 .../giraph/partition/HashWorkerPartitioner.java    |    9 +-
 .../giraph/partition/MasterGraphPartitioner.java   |    3 +-
 .../org/apache/giraph/partition/Partition.java     |   24 +-
 .../apache/giraph/partition/PartitionContext.java  |   45 ---
 .../apache/giraph/partition/PartitionStore.java    |   13 +-
 .../giraph/partition/RangeMasterPartitioner.java   |    5 +-
 .../giraph/partition/RangePartitionerFactory.java  |    5 +-
 .../apache/giraph/partition/RangeSplitHint.java    |    2 +-
 .../giraph/partition/RangeWorkerPartitioner.java   |    5 +-
 .../giraph/partition/ReusesObjectsPartition.java   |    4 +-
 .../SimpleIntRangePartitionerFactory.java          |   13 +-
 .../SimpleLongRangePartitionerFactory.java         |   13 +-
 .../apache/giraph/partition/SimplePartition.java   |   31 +-
 .../giraph/partition/SimplePartitionStore.java     |   21 +-
 .../partition/SimpleRangeMasterPartitioner.java    |    5 +-
 .../partition/SimpleRangeWorkerPartitioner.java    |   10 +-
 .../giraph/partition/WorkerGraphPartitioner.java   |    7 +-
 .../apache/giraph/utils/ByteArrayVertexIdData.java |    4 +-
 .../giraph/utils/ByteArrayVertexIdEdges.java       |    4 +-
 .../giraph/utils/ByteArrayVertexIdMessages.java    |   26 +-
 .../apache/giraph/utils/ConfigurationUtils.java    |   17 +-
 .../giraph/utils/InMemoryVertexInputFormat.java    |    6 +-
 .../apache/giraph/utils/InternalVertexRunner.java  |   14 +-
 .../org/apache/giraph/utils/ReflectionUtils.java   |   20 +
 .../java/org/apache/giraph/utils/TestGraph.java    |   32 +-
 .../org/apache/giraph/utils/VertexIdIterator.java  |    2 +-
 .../org/apache/giraph/utils/WritableUtils.java     |  109 ++++--
 .../org/apache/giraph/worker/BspServiceWorker.java |  125 +++----
 .../giraph/worker/EdgeInputSplitsCallable.java     |   25 +-
 .../worker/EdgeInputSplitsCallableFactory.java     |   22 +-
 .../apache/giraph/worker/InputSplitsCallable.java  |   40 +--
 .../giraph/worker/VertexInputSplitsCallable.java   |   34 +--
 .../worker/VertexInputSplitsCallableFactory.java   |   22 +-
 .../giraph/worker/WorkerAggregatorHandler.java     |    4 +-
 .../org/apache/giraph/worker/WorkerContext.java    |   16 +-
 .../org/apache/giraph/comm/ConnectionTest.java     |   17 +-
 .../org/apache/giraph/comm/RequestFailureTest.java |   22 +-
 .../java/org/apache/giraph/comm/RequestTest.java   |   58 ++--
 .../org/apache/giraph/comm/SaslConnectionTest.java |   13 +-
 .../org/apache/giraph/comm/TestMessageStores.java  |   38 +--
 .../org/apache/giraph/conf/TestObjectCreation.java |   20 +-
 .../apache/giraph/graph/TestVertexAndEdges.java    |   37 +-
 .../TestAdjacencyListTextVertexOutputFormat.java   |   20 +-
 .../java/org/apache/giraph/io/TestEdgeInput.java   |   29 +-
 .../java/org/apache/giraph/io/TestFilters.java     |   12 +-
 .../giraph/io/TestIdWithValueTextOutputFormat.java |   17 +-
 .../org/apache/giraph/io/TestJsonBase64Format.java |   17 +-
 ...DoubleDoubleAdjacencyListVertexInputFormat.java |   33 +-
 ...DoubleDoubleAdjacencyListVertexInputFormat.java |   55 +--
 .../master/TestComputationCombinerTypes.java       |  160 ++++++++
 .../apache/giraph/master/TestMasterObserver.java   |   55 +++-
 .../apache/giraph/master/TestSwitchClasses.java    |  268 +++++++++++++
 .../partition/TestGiraphTransferRegulator.java     |   13 +-
 .../giraph/partition/TestPartitionStores.java      |  111 +++----
 .../apache/giraph/utils/ComputationCountEdges.java |   41 ++
 .../giraph/utils/IntIntNullNoOpComputation.java    |   30 ++
 .../apache/giraph/utils/IntNoOpComputation.java    |   29 ++
 .../apache/giraph/utils/LongNoOpComputation.java   |   29 ++
 .../java/org/apache/giraph/utils/MockUtils.java    |   87 ++---
 .../org/apache/giraph/utils/NoOpComputation.java   |   44 +++
 .../giraph/vertices/IntIntNullVertexDoNothing.java |   25 --
 .../apache/giraph/vertices/VertexCountEdges.java   |   33 --
 .../apache/giraph/vertices/VertexDoNothing.java    |   33 --
 .../java/org/apache/giraph/yarn/TestYarnJob.java   |   12 +-
 .../examples/AggregatorsTestComputation.java       |  137 +++++++
 .../giraph/examples/AggregatorsTestVertex.java     |  134 -------
 .../examples/ConnectedComponentsComputation.java   |  105 ++++++
 .../giraph/examples/ConnectedComponentsVertex.java |  101 -----
 .../giraph/examples/IdentityComputation.java       |   47 +++
 .../org/apache/giraph/examples/IdentityVertex.java |   45 ---
 .../examples/LongDoubleDoubleTextInputFormat.java  |   13 +-
 .../examples/LongDoubleNullTextInputFormat.java    |   13 +-
 ...NormalizingLongDoubleDoubleTextInputFormat.java |   15 +-
 .../giraph/examples/PageRankComputation.java       |   55 +++
 .../org/apache/giraph/examples/PageRankVertex.java |   54 ---
 .../examples/PartitionContextTestVertex.java       |  115 ------
 .../giraph/examples/RandomWalkComputation.java     |  173 +++++++++
 .../apache/giraph/examples/RandomWalkVertex.java   |  163 --------
 .../examples/RandomWalkVertexMasterCompute.java    |   18 +-
 .../examples/RandomWalkWithRestartComputation.java |   85 +++++
 .../examples/RandomWalkWithRestartVertex.java      |   80 ----
 .../giraph/examples/RandomWalkWorkerContext.java   |    8 +-
 .../apache/giraph/examples/SimpleCheckpoint.java   |  292 +++++++++++++++
 .../giraph/examples/SimpleCheckpointVertex.java    |  286 --------------
 .../giraph/examples/SimpleCombinerComputation.java |   70 ++++
 .../giraph/examples/SimpleCombinerVertex.java      |   65 ----
 .../giraph/examples/SimpleFailComputation.java     |   74 ++++
 .../apache/giraph/examples/SimpleFailVertex.java   |   69 ----
 .../examples/SimpleInDegreeCountComputation.java   |   58 +++
 .../giraph/examples/SimpleInDegreeCountVertex.java |   53 ---
 ...eLongDoubleDoubleDoubleIdentityComputation.java |   32 ++
 ...SimpleLongDoubleDoubleDoubleIdentityVertex.java |   32 --
 .../examples/SimpleMasterComputeComputation.java   |  112 ++++++
 .../giraph/examples/SimpleMasterComputeVertex.java |  108 ------
 .../giraph/examples/SimpleMsgComputation.java      |   67 ++++
 .../apache/giraph/examples/SimpleMsgVertex.java    |   62 ---
 .../examples/SimpleMutateGraphComputation.java     |  198 ++++++++++
 .../giraph/examples/SimpleMutateGraphVertex.java   |  197 ----------
 .../examples/SimpleOutDegreeCountComputation.java  |   46 +++
 .../examples/SimpleOutDegreeCountVertex.java       |   43 ---
 .../giraph/examples/SimplePageRankComputation.java |  250 ++++++++++++
 .../giraph/examples/SimplePageRankVertex.java      |  248 ------------
 .../examples/SimpleShortestPathsComputation.java   |   86 +++++
 .../giraph/examples/SimpleShortestPathsVertex.java |   81 ----
 .../examples/SimpleSuperstepComputation.java       |  152 ++++++++
 .../giraph/examples/SimpleSuperstepVertex.java     |  150 --------
 .../examples/SimpleTextVertexOutputFormat.java     |    2 +-
 .../examples/SimpleTriangleClosingComputation.java |  154 ++++++++
 .../examples/SimpleTriangleClosingVertex.java      |  151 --------
 .../examples/SimpleVertexWithWorkerContext.java    |   22 +-
 .../examples/TestComputationStateComputation.java  |  109 ++++++
 .../org/apache/giraph/examples/VerifyMessage.java  |   50 ++--
 ...xWithDoubleValueDoubleEdgeTextOutputFormat.java |    2 +-
 ...texWithDoubleValueNullEdgeTextOutputFormat.java |    2 +-
 .../java/org/apache/giraph/TestAutoCheckpoint.java |   16 +-
 .../test/java/org/apache/giraph/TestBspBasic.java  |   85 ++---
 .../org/apache/giraph/TestComputationState.java    |   65 ++++
 .../org/apache/giraph/TestGraphPartitioner.java    |   54 ++--
 .../org/apache/giraph/TestManualCheckpoint.java    |   28 +-
 .../java/org/apache/giraph/TestMaxSuperstep.java   |   18 +-
 .../java/org/apache/giraph/TestMutateGraph.java    |   10 +-
 .../org/apache/giraph/TestNotEnoughMapTasks.java   |   10 +-
 .../org/apache/giraph/TestPartitionContext.java    |   67 ----
 .../aggregators/TestAggregatorsHandling.java       |   26 +-
 .../ConnectedComponentsComputationTest.java        |  120 ++++++
 ...ConnectedComponentsComputationTestInMemory.java |  127 +++++++
 .../examples/ConnectedComponentsVertexTest.java    |  120 ------
 .../ConnectedComponentsVertexTestInMemory.java     |  130 -------
 .../giraph/examples/PageRankComputationTest.java   |   79 ++++
 .../apache/giraph/examples/PageRankVertexTest.java |   78 ----
 .../RandomWalkWithRestartComputationTest.java      |  111 ++++++
 .../examples/RandomWalkWithRestartVertexTest.java  |  109 ------
 .../SimpleShortestPathsComputationTest.java        |  164 ++++++++
 .../examples/SimpleShortestPathsVertexTest.java    |  161 --------
 .../SimpleTriangleClosingComputationTest.java      |   92 +++++
 .../examples/SimpleTriangleClosingVertexTest.java  |   93 -----
 .../org/apache/giraph/examples/TestPageRank.java   |   14 +-
 .../examples/TryMultiIpcBindingPortsTest.java      |    2 +-
 .../apache/giraph/vertex/TestComputationTypes.java |  244 ++++++++++++
 .../org/apache/giraph/vertex/TestVertexTypes.java  |  248 ------------
 .../io/hbase/TestHBaseRootMarkerVertextFormat.java |   14 +-
 .../io/hbase/edgemarker/TableEdgeInputFormat.java  |    4 +-
 .../io/hbase/edgemarker/TableEdgeOutputFormat.java |    2 +-
 .../giraph/io/hcatalog/HCatGiraphRunner.java       |   34 +-
 .../io/hcatalog/HCatalogVertexInputFormat.java     |    8 +-
 .../io/hcatalog/HCatalogVertexOutputFormat.java    |   10 +-
 .../org/apache/giraph/hive/HiveGiraphRunner.java   |   38 +-
 .../DefaultConfigurableAndTableSchemaAware.java    |    6 +-
 .../giraph/hive/input/edge/AbstractHiveToEdge.java |    4 +-
 .../hive/input/edge/HiveEdgeInputFormat.java       |    2 +-
 .../giraph/hive/input/edge/HiveEdgeReader.java     |    2 +-
 .../hive/input/vertex/AbstractHiveToVertex.java    |    4 +-
 .../giraph/hive/input/vertex/HiveToVertex.java     |    2 +-
 .../hive/input/vertex/HiveVertexInputFormat.java   |    2 +-
 .../giraph/hive/input/vertex/HiveVertexReader.java |    6 +-
 .../hive/input/vertex/SimpleHiveToVertex.java      |    8 +-
 .../giraph/hive/output/AbstractVertexToHive.java   |    2 +-
 .../giraph/hive/output/HiveVertexOutputFormat.java |    2 +-
 .../giraph/hive/output/HiveVertexWriter.java       |    2 +-
 .../giraph/hive/output/SimpleVertexToHive.java     |    4 +-
 .../apache/giraph/hive/output/VertexToHive.java    |    2 +-
 .../output/examples/HiveOutputIntIntVertex.java    |    2 +-
 .../hive/computations/ComputationCountEdges.java   |   36 ++
 .../hive/computations/ComputationSumEdges.java     |   42 ++
 .../giraph/hive/computations/package-info.java     |   22 ++
 .../giraph/hive/input/HiveEdgeInputTest.java       |   10 +-
 .../giraph/hive/input/HiveVertexInputTest.java     |   12 +-
 .../apache/giraph/hive/output/HiveOutputTest.java  |    4 +-
 .../giraph/hive/vertexes/VertexCountEdges.java     |   34 --
 .../giraph/hive/vertexes/VertexSumEdges.java       |   39 --
 .../apache/giraph/hive/vertexes/package-info.java  |   22 --
 299 files changed, 6685 insertions(+), 6079 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 52f8099..c20ded9 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-667: Decouple Vertex data and Computation, make Computation
+  and Combiner classes switchable (majakabiljo)
+
   GIRAPH-608: Spelling error in Combiner.java (Michael Aro via aching)
 
   GIRAPH-666: Netty execs threads and metrics threads don't get 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java b/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
index 221fabd..c286ed4 100644
--- a/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
+++ b/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
@@ -69,7 +69,7 @@ public abstract class AccumuloVertexInputFormat<
       extends VertexReader<I, V, E> {
 
     /** Giraph configuration */
-    private ImmutableClassesGiraphConfiguration<I, V, E, Writable>
+    private ImmutableClassesGiraphConfiguration<I, V, E>
     configuration;
     /**
      * Used by subclasses to read key/value pairs.
@@ -86,7 +86,7 @@ public abstract class AccumuloVertexInputFormat<
       this.reader = reader;
     }
 
-    public ImmutableClassesGiraphConfiguration<I, V, E, Writable>
+    public ImmutableClassesGiraphConfiguration<I, V, E>
     getConfiguration() {
       return configuration;
     }
@@ -105,7 +105,7 @@ public abstract class AccumuloVertexInputFormat<
       reader.initialize(inputSplit, context);
       this.context = context;
       this.configuration =
-          new ImmutableClassesGiraphConfiguration<I, V, E, Writable>(
+          new ImmutableClassesGiraphConfiguration<I, V, E>(
               context.getConfiguration());
     }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java
----------------------------------------------------------------------
diff --git a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java
index 6698c9a..0ee9666 100644
--- a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java
+++ b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.ByteBufferUtil;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.giraph.BspCase;
+import org.apache.giraph.graph.BasicComputation;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.io.accumulo.edgemarker.AccumuloEdgeInputFormat;
 import org.apache.giraph.io.accumulo.edgemarker.AccumuloEdgeOutputFormat;
@@ -133,7 +134,7 @@ public class TestAccumuloVertexFormat extends BspCase{
         GiraphJob job = new GiraphJob(conf, getCallingMethodName());
         setupConfiguration(job);
         GiraphConfiguration giraphConf = job.getConfiguration();
-        giraphConf.setVertexClass(EdgeNotification.class);
+        giraphConf.setComputationClass(EdgeNotification.class);
         giraphConf.setVertexInputFormatClass(AccumuloEdgeInputFormat.class);
         giraphConf.setVertexOutputFormatClass(AccumuloEdgeOutputFormat.class);
 
@@ -170,16 +171,17 @@ public class TestAccumuloVertexFormat extends BspCase{
     The test set only has a 1-1 parent-to-child ratio for this unit test.
      */
     public static class EdgeNotification
-            extends Vertex<Text, Text, Text, Text> {
-        @Override
-        public void compute(Iterable<Text> messages) throws IOException {
+            extends BasicComputation<Text, Text, Text, Text> {
+      @Override
+      public void compute(Vertex<Text, Text, Text> vertex,
+          Iterable<Text> messages) throws IOException {
           for (Text message : messages) {
-            getValue().set(message);
+            vertex.getValue().set(message);
           }
           if(getSuperstep() == 0) {
-            sendMessageToAllEdges(getId());
+            sendMessageToAllEdges(vertex, vertex.getId());
           }
-          voteToHalt();
-        }
+        vertex.voteToHalt();
+      }
     }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
index 5855cfc..108ae61 100644
--- a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
+++ b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
@@ -73,11 +73,11 @@ public class AccumuloEdgeInputFormat
     /*
    Each Key/Value contains the information needed to construct the vertices.
      */
-    public Vertex<Text, Text, Text, ?> getCurrentVertex()
+    public Vertex<Text, Text, Text> getCurrentVertex()
         throws IOException, InterruptedException {
       Key key = getRecordReader().getCurrentKey();
       Value value = getRecordReader().getCurrentValue();
-      Vertex<Text, Text, Text, ?> vertex =
+      Vertex<Text, Text, Text> vertex =
           getConfiguration().createVertex();
       Text vertexId = key.getRow();
       List<Edge<Text, Text>> edges = Lists.newLinkedList();

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeOutputFormat.java b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeOutputFormat.java
index f999497..c2ebbe2 100644
--- a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeOutputFormat.java
+++ b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeOutputFormat.java
@@ -64,7 +64,7 @@ public class AccumuloEdgeOutputFormat
      Write back a mutation that adds a qualifier for 'parent' containing the vertex value
      as the cell value. Assume the vertex ID corresponds to a key.
      */
-    public void writeVertex(Vertex<Text, Text, Text, ?> vertex)
+    public void writeVertex(Vertex<Text, Text, Text> vertex)
         throws IOException, InterruptedException {
       RecordWriter<Text, Mutation> writer = getRecordWriter();
       Mutation mt = new Mutation(vertex.getId());

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
index 12304bb..539bd7d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
@@ -20,6 +20,7 @@ package org.apache.giraph.benchmark;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.BasicComputation;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
@@ -52,10 +53,13 @@ public class AggregatorsBenchmark extends GiraphBenchmark {
   /**
    * Vertex class for AggregatorsBenchmark
    */
-  public static class AggregatorsBenchmarkVertex extends
-      Vertex<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> {
+  public static class AggregatorsBenchmarkComputation extends
+      BasicComputation<LongWritable, DoubleWritable, DoubleWritable,
+          DoubleWritable> {
     @Override
-    public void compute(Iterable<DoubleWritable> messages) throws IOException {
+    public void compute(
+        Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
+        Iterable<DoubleWritable> messages) throws IOException {
       int n = getNumAggregators(getConf());
       long superstep = getSuperstep();
       int w = getWorkerContextAggregated(getConf(), superstep);
@@ -71,7 +75,7 @@ public class AggregatorsBenchmark extends GiraphBenchmark {
             ((LongWritable) getAggregatedValue("p" + i)).get());
       }
       if (superstep > 2) {
-        voteToHalt();
+        vertex.voteToHalt();
       }
     }
   }
@@ -201,7 +205,7 @@ public class AggregatorsBenchmark extends GiraphBenchmark {
   @Override
   protected void prepareConfiguration(GiraphConfiguration conf,
       CommandLine cmd) {
-    conf.setVertexClass(AggregatorsBenchmarkVertex.class);
+    conf.setComputationClass(AggregatorsBenchmarkComputation.class);
     conf.setMasterComputeClass(AggregatorsBenchmarkMasterCompute.class);
     conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
     conf.setWorkerContextClass(AggregatorsBenchmarkWorkerContext.class);

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
index 0f8d284..bd2939e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
@@ -31,7 +31,7 @@ import com.google.common.collect.Sets;
 import java.util.Set;
 
 /**
- * Benchmark for {@link PageRankVertex}
+ * Benchmark for {@link PageRankComputation}
  */
 public class PageRankBenchmark extends GiraphBenchmark {
   @Override
@@ -44,7 +44,7 @@ public class PageRankBenchmark extends GiraphBenchmark {
   @Override
   protected void prepareConfiguration(GiraphConfiguration conf,
       CommandLine cmd) {
-    conf.setVertexClass(PageRankVertex.class);
+    conf.setComputationClass(PageRankComputation.class);
     conf.setOutEdgesClass(IntNullArrayEdges.class);
     conf.setCombinerClass(FloatSumCombiner.class);
     conf.setVertexInputFormatClass(
@@ -54,7 +54,7 @@ public class PageRankBenchmark extends GiraphBenchmark {
         BenchmarkOption.VERTICES.getOptionIntValue(cmd));
     conf.setInt(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX,
         BenchmarkOption.EDGES_PER_VERTEX.getOptionIntValue(cmd));
-    conf.setInt(PageRankVertex.SUPERSTEP_COUNT,
+    conf.setInt(PageRankComputation.SUPERSTEP_COUNT,
         BenchmarkOption.SUPERSTEPS.getOptionIntValue(cmd));
     conf.setFloat(PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO,
         BenchmarkOption.LOCAL_EDGES_MIN_RATIO.getOptionFloatValue(cmd,

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankComputation.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankComputation.java
new file mode 100644
index 0000000..e891ff2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankComputation.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.benchmark;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import java.io.IOException;
+
+/**
+ * Implementation of PageRank in which vertex ids are ints, page rank values
+ * are floats, and graph is unweighted.
+ */
+public class PageRankComputation extends BasicComputation<IntWritable,
+    FloatWritable, NullWritable, FloatWritable> {
+  /** Number of supersteps */
+  public static final String SUPERSTEP_COUNT =
+      "giraph.pageRank.superstepCount";
+
+  @Override
+  public void compute(
+      Vertex<IntWritable, FloatWritable, NullWritable> vertex,
+      Iterable<FloatWritable> messages) throws IOException {
+    if (getSuperstep() >= 1) {
+      float sum = 0;
+      for (FloatWritable message : messages) {
+        sum += message.get();
+      }
+      vertex.getValue().set((0.15f / getTotalNumVertices()) + 0.85f * sum);
+    }
+
+    if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, 0)) {
+      sendMessageToAllEdges(vertex,
+          new FloatWritable(vertex.getValue().get() / vertex.getNumEdges()));
+    } else {
+      vertex.voteToHalt();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankVertex.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankVertex.java
deleted file mode 100644
index 9900a44..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankVertex.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.benchmark;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import java.io.IOException;
-
-/**
- * Implementation of PageRank in which vertex ids are ints, page rank values
- * are floats, and graph is unweighted.
- */
-public class PageRankVertex extends Vertex<IntWritable, FloatWritable,
-    NullWritable, FloatWritable> {
-  /** Number of supersteps */
-  public static final String SUPERSTEP_COUNT =
-      "giraph.pageRank.superstepCount";
-
-  @Override
-  public void compute(Iterable<FloatWritable> messages) throws IOException {
-    if (getSuperstep() >= 1) {
-      float sum = 0;
-      for (FloatWritable message : messages) {
-        sum += message.get();
-      }
-      getValue().set((0.15f / getTotalNumVertices()) + 0.85f * sum);
-    }
-
-    if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, 0)) {
-      sendMessageToAllEdges(
-          new FloatWritable(getValue().get() / getNumEdges()));
-    } else {
-      voteToHalt();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
index 5c7e019..cf2e6eb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
@@ -20,6 +20,7 @@ package org.apache.giraph.benchmark;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.BasicComputation;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
@@ -35,6 +36,7 @@ import org.apache.log4j.Logger;
 
 import com.google.common.collect.Sets;
 
+import java.io.IOException;
 import java.util.Random;
 import java.util.Set;
 
@@ -257,25 +259,26 @@ public class RandomMessageBenchmark extends GiraphBenchmark {
   /**
    * Actual message computation (messaging in this case)
    */
-  public static class RandomMessageVertex extends Vertex<LongWritable,
-      DoubleWritable, DoubleWritable, BytesWritable> {
+  public static class RandomMessageComputation extends BasicComputation<
+      LongWritable, DoubleWritable, DoubleWritable, BytesWritable> {
     @Override
-    public void compute(Iterable<BytesWritable> messages) {
-      RandomMessageBenchmarkWorkerContext workerContext =
-          (RandomMessageBenchmarkWorkerContext) getWorkerContext();
+    public void compute(
+        Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
+        Iterable<BytesWritable> messages) throws IOException {
+      RandomMessageBenchmarkWorkerContext workerContext = getWorkerContext();
       if (getSuperstep() < workerContext.getNumSupersteps()) {
         for (int i = 0; i < workerContext.getNumMessagePerEdge(); i++) {
           workerContext.randomizeMessageBytes();
-          sendMessageToAllEdges(
+          sendMessageToAllEdges(vertex,
               new BytesWritable(workerContext.getMessageBytes()));
           long bytesSent = workerContext.getMessageBytes().length *
-              getNumEdges();
+              vertex.getNumEdges();
           aggregate(AGG_SUPERSTEP_TOTAL_BYTES, new LongWritable(bytesSent));
           aggregate(AGG_SUPERSTEP_TOTAL_MESSAGES,
-              new LongWritable(getNumEdges()));
+              new LongWritable(vertex.getNumEdges()));
         }
       } else {
-        voteToHalt();
+        vertex.voteToHalt();
       }
     }
   }
@@ -290,7 +293,7 @@ public class RandomMessageBenchmark extends GiraphBenchmark {
   @Override
   protected void prepareConfiguration(GiraphConfiguration conf,
       CommandLine cmd) {
-    conf.setVertexClass(RandomMessageVertex.class);
+    conf.setComputationClass(RandomMessageComputation.class);
     conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
     conf.setWorkerContextClass(RandomMessageBenchmarkWorkerContext.class);
     conf.setMasterComputeClass(RandomMessageBenchmarkMasterCompute.class);

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
index 8e6c877..0dd4529 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
@@ -58,13 +58,13 @@ public class ShortestPathsBenchmark extends GiraphBenchmark {
   @Override
   protected void prepareConfiguration(GiraphConfiguration conf,
       CommandLine cmd) {
-    conf.setVertexClass(ShortestPathsVertex.class);
+    conf.setComputationClass(ShortestPathsComputation.class);
     if (EDGES_CLASS.getOptionIntValue(cmd, 1) == 1) {
       conf.setOutEdgesClass(ArrayListEdges.class);
     } else {
       conf.setOutEdgesClass(HashMapEdges.class);
     }
-    LOG.info("Using class " + GiraphConstants.VERTEX_CLASS.get(conf));
+    LOG.info("Using class " + GiraphConstants.COMPUTATION_CLASS.get(conf));
     conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
     if (!NO_COMBINER.optionTurnedOn(cmd)) {
       conf.setCombinerClass(MinimumDoubleCombiner.class);

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsComputation.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsComputation.java
new file mode 100644
index 0000000..ad0600c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsComputation.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.benchmark;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.IOException;
+
+/**
+ * Shortest paths algorithm.
+ */
+public class ShortestPathsComputation extends BasicComputation<LongWritable,
+    DoubleWritable, DoubleWritable, DoubleWritable> {
+  /** Source id. */
+  public static final String SOURCE_ID =
+      "giraph.shortestPathsBenchmark.sourceId";
+  /** Default source id. */
+  public static final long SOURCE_ID_DEFAULT = 1;
+
+  /**
+   * Check if vertex is source from which to calculate shortest paths.
+   *
+   * @param vertex Vertex
+   * @return True iff vertex is source for shortest paths
+   */
+  private boolean isSource(
+      Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex) {
+    return vertex.getId().get() ==
+        getConf().getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
+  }
+
+  @Override
+  public void compute(
+      Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
+      Iterable<DoubleWritable> messages) throws IOException {
+    if (getSuperstep() == 0) {
+      vertex.setValue(new DoubleWritable(Double.MAX_VALUE));
+    }
+
+    double minDist = isSource(vertex) ? 0d : Double.MAX_VALUE;
+    for (DoubleWritable message : messages) {
+      minDist = Math.min(minDist, message.get());
+    }
+
+    if (minDist < vertex.getValue().get()) {
+      vertex.setValue(new DoubleWritable(minDist));
+      for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
+        double distance = minDist + edge.getValue().get();
+        sendMessage(edge.getTargetVertexId(),
+            new DoubleWritable(distance));
+      }
+    }
+
+    vertex.voteToHalt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsVertex.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsVertex.java
deleted file mode 100644
index c1b77d1..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsVertex.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.benchmark;
-
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-
-import java.io.IOException;
-
-/**
- * Shortest paths algorithm.
- */
-public class ShortestPathsVertex extends Vertex<LongWritable, DoubleWritable,
-    DoubleWritable, DoubleWritable> {
-  /** Source id. */
-  public static final String SOURCE_ID =
-      "giraph.shortestPathsBenchmark.sourceId";
-  /** Default source id. */
-  public static final long SOURCE_ID_DEFAULT = 1;
-
-  private boolean isSource() {
-    return getId().get() == getConf().getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
-  }
-
-  @Override
-  public void compute(Iterable<DoubleWritable> messages) throws IOException {
-    if (getSuperstep() == 0) {
-      setValue(new DoubleWritable(Double.MAX_VALUE));
-    }
-
-    double minDist = isSource() ? 0d : Double.MAX_VALUE;
-    for (DoubleWritable message : messages) {
-      minDist = Math.min(minDist, message.get());
-    }
-
-    if (minDist < getValue().get()) {
-      setValue(new DoubleWritable(minDist));
-      for (Edge<LongWritable, DoubleWritable> edge : getEdges()) {
-        double distance = minDist + edge.getValue().get();
-        sendMessage(edge.getTargetVertexId(),
-            new DoubleWritable(distance));
-      }
-    }
-
-    voteToHalt();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java
index 3fc514a..2077674 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java
@@ -38,7 +38,7 @@ import com.google.common.collect.Sets;
 import java.util.Set;
 
 /**
- * Benchmark for {@link WeightedPageRankVertex}
+ * Benchmark for {@link WeightedPageRankComputation}
  */
 public class WeightedPageRankBenchmark extends GiraphBenchmark {
   /** Class logger */
@@ -88,7 +88,7 @@ public class WeightedPageRankBenchmark extends GiraphBenchmark {
    */
   protected void prepareConfiguration(GiraphConfiguration configuration,
       CommandLine cmd) {
-    configuration.setVertexClass(WeightedPageRankVertex.class);
+    configuration.setComputationClass(WeightedPageRankComputation.class);
     int edgesClassOption = EDGES_CLASS.getOptionIntValue(cmd, 1);
     switch (edgesClassOption) {
     case 0:
@@ -149,7 +149,7 @@ public class WeightedPageRankBenchmark extends GiraphBenchmark {
           SimpleLongRangePartitionerFactory.class);
     }
 
-    configuration.setInt(WeightedPageRankVertex.SUPERSTEP_COUNT,
+    configuration.setInt(WeightedPageRankComputation.SUPERSTEP_COUNT,
         BenchmarkOption.SUPERSTEPS.getOptionIntValue(cmd));
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankComputation.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankComputation.java
new file mode 100644
index 0000000..18182ed
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankComputation.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.benchmark;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.MutableEdge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.IOException;
+
+/**
+ * Implementation of Page Rank algorithm on a weighted graph.
+ */
+public class WeightedPageRankComputation extends BasicComputation<LongWritable,
+    DoubleWritable, DoubleWritable, DoubleWritable> {
+  /** Number of supersteps */
+  public static final String SUPERSTEP_COUNT =
+      "giraph.weightedPageRank.superstepCount";
+
+  @Override
+  public void compute(
+      Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
+      Iterable<DoubleWritable> messages) throws IOException {
+    if (getSuperstep() == 0) {
+      // Normalize out edge weights
+      double outEdgeSum = 0;
+      for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
+        outEdgeSum += edge.getValue().get();
+      }
+      for (MutableEdge<LongWritable, DoubleWritable> edge :
+          vertex.getMutableEdges()) {
+        edge.setValue(new DoubleWritable(edge.getValue().get() / outEdgeSum));
+      }
+    } else {
+      double messageSum = 0;
+      for (DoubleWritable message : messages) {
+        messageSum += message.get();
+      }
+      vertex.getValue().set(
+          (0.15f / getTotalNumVertices()) + 0.85f * messageSum);
+    }
+
+    if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, 0)) {
+      for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
+        sendMessage(edge.getTargetVertexId(),
+            new DoubleWritable(
+                vertex.getValue().get() * edge.getValue().get()));
+      }
+    } else {
+      vertex.voteToHalt();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankVertex.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankVertex.java
deleted file mode 100644
index 70f0f61..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankVertex.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.benchmark;
-
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.edge.MutableEdge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-
-import java.io.IOException;
-
-/**
- * Implementation of Page Rank algorithm on a weighted graph.
- */
-public class WeightedPageRankVertex extends Vertex<LongWritable, DoubleWritable,
-    DoubleWritable, DoubleWritable> {
-  /** Number of supersteps */
-  public static final String SUPERSTEP_COUNT =
-      "giraph.weightedPageRank.superstepCount";
-
-  @Override
-  public void compute(Iterable<DoubleWritable> messages) throws IOException {
-    if (getSuperstep() == 0) {
-      // Normalize out edge weights
-      double outEdgeSum = 0;
-      for (Edge<LongWritable, DoubleWritable> edge : getEdges()) {
-        outEdgeSum += edge.getValue().get();
-      }
-      for (MutableEdge<LongWritable, DoubleWritable> edge : getMutableEdges()) {
-        edge.setValue(new DoubleWritable(edge.getValue().get() / outEdgeSum));
-      }
-    } else {
-      double messageSum = 0;
-      for (DoubleWritable message : messages) {
-        messageSum += message.get();
-      }
-      getValue().set((0.15f / getTotalNumVertices()) + 0.85f * messageSum);
-    }
-
-    if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, 0)) {
-      for (Edge<LongWritable, DoubleWritable> edge : getEdges()) {
-        sendMessage(edge.getTargetVertexId(),
-            new DoubleWritable(getValue().get() * edge.getValue().get()));
-      }
-    } else {
-      voteToHalt();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index 178c96f..ff3f06d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -60,12 +60,11 @@ import static org.apache.giraph.conf.GiraphConstants.CHECKPOINT_DIRECTORY;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public abstract class BspService<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    implements Watcher, CentralizedService<I, V, E, M> {
+    V extends Writable, E extends Writable>
+    implements Watcher, CentralizedService<I, V, E> {
   /** Unset superstep */
   public static final long UNSET_SUPERSTEP = Long.MIN_VALUE;
   /** Input superstep (superstep when loading the vertices happens) */
@@ -218,7 +217,7 @@ public abstract class BspService<I extends WritableComparable,
   private final List<BspEvent> registeredBspEvents =
       new ArrayList<BspEvent>();
   /** Immutable configuration of the job*/
-  private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
   /** Job context (mainly for progress) */
   private final Mapper<?, ?, ?, ?>.Context context;
   /** Cached superstep (from ZooKeeper) */
@@ -236,9 +235,9 @@ public abstract class BspService<I extends WritableComparable,
   /** Combination of hostname '_' partition (unique id) */
   private final String hostnamePartitionId;
   /** Graph partitioner */
-  private final GraphPartitionerFactory<I, V, E, M> graphPartitionerFactory;
+  private final GraphPartitionerFactory<I, V, E> graphPartitionerFactory;
   /** Mapper that will do the graph computation */
-  private final GraphTaskManager<I, V, E, M> graphTaskManager;
+  private final GraphTaskManager<I, V, E> graphTaskManager;
   /** File system */
   private final FileSystem fs;
   /** Checkpoint frequency */
@@ -255,7 +254,7 @@ public abstract class BspService<I extends WritableComparable,
   public BspService(String serverPortList,
       int sessionMsecTimeout,
       Mapper<?, ?, ?, ?>.Context context,
-      GraphTaskManager<I, V, E, M> graphTaskManager) {
+      GraphTaskManager<I, V, E> graphTaskManager) {
     this.vertexInputSplitsEvents = new InputSplitEvents(context);
     this.edgeInputSplitsEvents = new InputSplitEvents(context);
     this.connectedEvent = new PredicateLock(context);
@@ -280,8 +279,7 @@ public abstract class BspService<I extends WritableComparable,
 
     this.context = context;
     this.graphTaskManager = graphTaskManager;
-    this.conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
-        context.getConfiguration());
+    this.conf = graphTaskManager.getConf();
     this.jobId = conf.get("mapred.job.id", "Unknown Job");
     this.taskPartition = conf.getTaskPartition();
     this.restartedSuperstep = conf.getLong(
@@ -605,7 +603,7 @@ public abstract class BspService<I extends WritableComparable,
     return fs;
   }
 
-  public final ImmutableClassesGiraphConfiguration<I, V, E, M>
+  public final ImmutableClassesGiraphConfiguration<I, V, E>
   getConfiguration() {
     return conf;
   }
@@ -626,7 +624,7 @@ public abstract class BspService<I extends WritableComparable,
     return taskPartition;
   }
 
-  public final GraphTaskManager<I, V, E, M> getGraphTaskManager() {
+  public final GraphTaskManager<I, V, E> getGraphTaskManager() {
     return graphTaskManager;
   }
 
@@ -885,7 +883,7 @@ public abstract class BspService<I extends WritableComparable,
    *
    * @return Instantiated graph partitioner factory
    */
-  protected GraphPartitionerFactory<I, V, E, M> getGraphPartitionerFactory() {
+  protected GraphPartitionerFactory<I, V, E> getGraphPartitionerFactory() {
     return graphPartitionerFactory;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
index 2281903..ff3e427 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
@@ -30,11 +30,10 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <I> Vertex id
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public interface CentralizedService<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
+    V extends Writable, E extends Writable> {
 
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
index 5f84ece..fb98b00 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
@@ -33,12 +33,11 @@ import java.io.IOException;
  * @param <I> Vertex id
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public interface CentralizedServiceMaster<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> extends
-    CentralizedService<I, V, E, M> {
+    V extends Writable, E extends Writable> extends
+    CentralizedService<I, V, E> {
   /**
    * Setup (must be called prior to any other function)
    */

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
index 1c7bde4..4b0f985 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
@@ -21,7 +21,6 @@ package org.apache.giraph.bsp;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerClient;
 import org.apache.giraph.graph.FinishedSuperstepStats;
-import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.GraphTaskManager;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.superstep_output.SuperstepOutput;
@@ -47,12 +46,11 @@ import java.util.List;
  * @param <I> Vertex id
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public interface CentralizedServiceWorker<I extends WritableComparable,
-  V extends Writable, E extends Writable, M extends Writable>
-  extends CentralizedService<I, V, E, M> {
+  V extends Writable, E extends Writable>
+  extends CentralizedService<I, V, E> {
   /**
    * Setup (must be called prior to any other function)
    *
@@ -73,7 +71,7 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
    *
    * @return Worker client
    */
-  WorkerClient<I, V, E, M> getWorkerClient();
+  WorkerClient<I, V, E> getWorkerClient();
 
   /**
    * Get the worker context.
@@ -97,7 +95,7 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
    *
    * @return The partition store for this worker.
    */
-  PartitionStore<I, V, E, M> getPartitionStore();
+  PartitionStore<I, V, E> getPartitionStore();
 
   /**
    *  Both the vertices and the messages need to be checkpointed in order
@@ -121,23 +119,19 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
    * Take all steps prior to actually beginning the computation of a
    * superstep.
    *
-   * @param graphState Current graph state
    * @return Collection of all the partition owners from the master for this
    *         superstep.
    */
-  Collection<? extends PartitionOwner> startSuperstep(
-      GraphState<I, V, E, M> graphState);
+  Collection<? extends PartitionOwner> startSuperstep();
 
   /**
    * Worker is done with its portion of the superstep.  Report the
    * worker level statistics after the computation.
    *
-   * @param graphState Current graph state
    * @param partitionStatsList All the partition stats for this worker
    * @return Stats of the superstep completion
    */
   FinishedSuperstepStats finishSuperstep(
-      GraphState<I, V, E, M> graphState,
       List<PartitionStats> partitionStatsList);
 
   /**
@@ -195,7 +189,7 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
    *
    * @return the GraphTaskManager instance for this compute node
    */
-  GraphTaskManager<I, V, E, M> getGraphTaskManager();
+  GraphTaskManager<I, V, E> getGraphTaskManager();
 
   /**
    * Operations that will be called if there is a failure by a worker.
@@ -207,7 +201,7 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
    *
    * @return Server data
    */
-  ServerData<I, V, E, M> getServerData();
+  ServerData<I, V, E> getServerData();
 
   /**
    * Get worker aggregator handler

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java
index 1e8bdf9..92d0926 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java
@@ -64,7 +64,7 @@ public abstract class SendCache<I extends WritableComparable, T,
    *                              ratio of the average request size)
    */
   public SendCache(ImmutableClassesGiraphConfiguration conf,
-                   CentralizedServiceWorker<?, ?, ?, ?> serviceWorker,
+                   CentralizedServiceWorker<?, ?, ?> serviceWorker,
                    int maxRequestSize,
                    float additionalRequestSize) {
     this.conf = conf;
@@ -176,4 +176,8 @@ public abstract class SendCache<I extends WritableComparable, T,
     }
     return allData;
   }
+
+  public ImmutableClassesGiraphConfiguration getConf() {
+    return conf;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
index fbc911f..5513da2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
@@ -46,7 +46,7 @@ public class SendEdgeCache<I extends WritableComparable, E extends Writable>
    * @param serviceWorker Service worker
    */
   public SendEdgeCache(ImmutableClassesGiraphConfiguration conf,
-                       CentralizedServiceWorker<?, ?, ?, ?> serviceWorker) {
+                       CentralizedServiceWorker<?, ?, ?> serviceWorker) {
     super(conf, serviceWorker, MAX_EDGE_REQUEST_SIZE.get(conf),
         ADDITIONAL_EDGE_REQUEST_SIZE.get(conf));
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
index 7d2a888..40023c2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
@@ -45,14 +45,15 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
    * @param serviceWorker Service worker
    */
   public SendMessageCache(ImmutableClassesGiraphConfiguration conf,
-      CentralizedServiceWorker<?, ?, ?, ?> serviceWorker) {
+      CentralizedServiceWorker<?, ?, ?> serviceWorker) {
     super(conf, serviceWorker, MAX_MSG_REQUEST_SIZE.get(conf),
         ADDITIONAL_MSG_REQUEST_SIZE.get(conf));
   }
 
   @Override
   public ByteArrayVertexIdMessages<I, M> createByteArrayVertexIdData() {
-    return new ByteArrayVertexIdMessages<I, M>();
+    return new ByteArrayVertexIdMessages<I, M>(
+        getConf().getOutgoingMessageValueClass());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/SendMutationsCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMutationsCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMutationsCache.java
index 67f74f1..9348e61 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMutationsCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendMutationsCache.java
@@ -33,14 +33,13 @@ import java.util.Map;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public class SendMutationsCache<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
+    V extends Writable, E extends Writable> {
   /** Internal cache */
-  private Map<Integer, Map<I, VertexMutations<I, V, E, M>>> mutationCache =
-      new HashMap<Integer, Map<I, VertexMutations<I, V, E, M>>>();
+  private Map<Integer, Map<I, VertexMutations<I, V, E>>> mutationCache =
+      new HashMap<Integer, Map<I, VertexMutations<I, V, E>>>();
   /** Number of mutations in each partition */
   private final Map<Integer, Integer> mutationCountMap =
       new HashMap<Integer, Integer>();
@@ -53,17 +52,17 @@ public class SendMutationsCache<I extends WritableComparable,
    * @param destVertexId Destination vertex id
    * @return Mutations for the vertex
    */
-  private VertexMutations<I, V, E, M> getVertexMutations(
+  private VertexMutations<I, V, E> getVertexMutations(
       Integer partitionId, I destVertexId) {
-    Map<I, VertexMutations<I, V, E, M>> idMutations =
+    Map<I, VertexMutations<I, V, E>> idMutations =
         mutationCache.get(partitionId);
     if (idMutations == null) {
-      idMutations = new HashMap<I, VertexMutations<I, V, E, M>>();
+      idMutations = new HashMap<I, VertexMutations<I, V, E>>();
       mutationCache.put(partitionId, idMutations);
     }
-    VertexMutations<I, V, E, M> mutations = idMutations.get(destVertexId);
+    VertexMutations<I, V, E> mutations = idMutations.get(destVertexId);
     if (mutations == null) {
-      mutations = new VertexMutations<I, V, E, M>();
+      mutations = new VertexMutations<I, V, E>();
       idMutations.put(destVertexId, mutations);
     }
     return mutations;
@@ -97,7 +96,7 @@ public class SendMutationsCache<I extends WritableComparable,
   public int addEdgeMutation(
       Integer partitionId, I destVertexId, Edge<I, E> edge) {
     // Get the mutations for this partition
-    VertexMutations<I, V, E, M> mutations =
+    VertexMutations<I, V, E> mutations =
         getVertexMutations(partitionId, destVertexId);
 
     // Add the edge
@@ -118,7 +117,7 @@ public class SendMutationsCache<I extends WritableComparable,
   public int removeEdgeMutation(
       Integer partitionId, I vertexIndex, I destinationVertexIndex) {
     // Get the mutations for this partition
-    VertexMutations<I, V, E, M> mutations =
+    VertexMutations<I, V, E> mutations =
         getVertexMutations(partitionId, vertexIndex);
 
     // Remove the edge
@@ -136,9 +135,9 @@ public class SendMutationsCache<I extends WritableComparable,
    * @return Number of mutations in the partition.
    */
   public int addVertexMutation(
-      Integer partitionId, Vertex<I, V, E, M> vertex) {
+      Integer partitionId, Vertex<I, V, E> vertex) {
     // Get the mutations for this partition
-    VertexMutations<I, V, E, M> mutations =
+    VertexMutations<I, V, E> mutations =
         getVertexMutations(partitionId, vertex.getId());
 
     // Add the vertex
@@ -158,7 +157,7 @@ public class SendMutationsCache<I extends WritableComparable,
   public int removeVertexMutation(
       Integer partitionId, I destVertexId) {
     // Get the mutations for this partition
-    VertexMutations<I, V, E, M> mutations =
+    VertexMutations<I, V, E> mutations =
         getVertexMutations(partitionId, destVertexId);
 
     // Remove the vertex
@@ -174,9 +173,9 @@ public class SendMutationsCache<I extends WritableComparable,
    * @param partitionId Partition id
    * @return Removed partition mutations
    */
-  public Map<I, VertexMutations<I, V, E, M>> removePartitionMutations(
+  public Map<I, VertexMutations<I, V, E>> removePartitionMutations(
       int partitionId) {
-    Map<I, VertexMutations<I, V, E, M>> idMutations =
+    Map<I, VertexMutations<I, V, E>> idMutations =
         mutationCache.remove(partitionId);
     mutationCountMap.put(partitionId, 0);
     return idMutations;
@@ -187,12 +186,12 @@ public class SendMutationsCache<I extends WritableComparable,
    *
    * @return All vertex mutations for all partitions
    */
-  public Map<Integer, Map<I, VertexMutations<I, V, E, M>>>
+  public Map<Integer, Map<I, VertexMutations<I, V, E>>>
   removeAllPartitionMutations() {
-    Map<Integer, Map<I, VertexMutations<I, V, E, M>>> allMutations =
+    Map<Integer, Map<I, VertexMutations<I, V, E>>> allMutations =
         mutationCache;
     mutationCache =
-        new HashMap<Integer, Map<I, VertexMutations<I, V, E, M>>>();
+        new HashMap<Integer, Map<I, VertexMutations<I, V, E>>>();
     mutationCountMap.clear();
     return allMutations;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendPartitionCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
index 31cf052..68ce095 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
@@ -36,23 +36,19 @@ import org.apache.log4j.Logger;
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message data
  */
 public class SendPartitionCache<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
+    V extends Writable, E extends Writable> {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(SendPartitionCache.class);
   /** Input split vertex cache (only used when loading from input split) */
-  private final Map<PartitionOwner, Partition<I, V, E, M>>
+  private final Map<PartitionOwner, Partition<I, V, E>>
   ownerPartitionMap = Maps.newHashMap();
-  /** Number of messages in each partition */
-  private final Map<PartitionOwner, Integer> messageCountMap =
-      Maps.newHashMap();
   /** Context */
   private final Mapper<?, ?, ?, ?>.Context context;
   /** Configuration */
-  private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+  private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
   /**
    *  Regulates the size of outgoing Collections of vertices read
    * by the local worker during INPUT_SUPERSTEP that are to be
@@ -69,7 +65,7 @@ public class SendPartitionCache<I extends WritableComparable,
    */
   public SendPartitionCache(
       Mapper<?, ?, ?, ?>.Context context,
-      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) {
+      ImmutableClassesGiraphConfiguration<I, V, E> configuration) {
     this.context = context;
     this.configuration = configuration;
     transferRegulator =
@@ -89,9 +85,9 @@ public class SendPartitionCache<I extends WritableComparable,
    * @param vertex Vertex to add
    * @return A partition to send or null, if requirements are not met
    */
-  public Partition<I, V, E, M> addVertex(PartitionOwner partitionOwner,
-                                         Vertex<I, V, E, M> vertex) {
-    Partition<I, V, E, M> partition =
+  public Partition<I, V, E> addVertex(PartitionOwner partitionOwner,
+                                         Vertex<I, V, E> vertex) {
+    Partition<I, V, E> partition =
         ownerPartitionMap.get(partitionOwner);
     if (partition == null) {
       partition = configuration.createPartition(
@@ -101,7 +97,7 @@ public class SendPartitionCache<I extends WritableComparable,
     }
     transferRegulator.incrementCounters(partitionOwner, vertex);
 
-    Vertex<I, V, E, M> oldVertex =
+    Vertex<I, V, E> oldVertex =
         partition.putVertex(vertex);
     if (oldVertex != null) {
       LOG.warn("addVertex: Replacing vertex " + oldVertex +
@@ -121,7 +117,7 @@ public class SendPartitionCache<I extends WritableComparable,
    *
    * @return Owner partition map
    */
-  public Map<PartitionOwner, Partition<I, V, E, M>> getOwnerPartitionMap() {
+  public Map<PartitionOwner, Partition<I, V, E>> getOwnerPartitionMap() {
     return ownerPartitionMap;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 743a6f8..788be53 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -43,34 +43,38 @@ import java.util.concurrent.ConcurrentHashMap;
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public class ServerData<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
+    V extends Writable, E extends Writable> {
+  /** Configuration */
+  private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
   /** Partition store for this worker. */
-  private volatile PartitionStore<I, V, E, M> partitionStore;
+  private volatile PartitionStore<I, V, E> partitionStore;
   /** Edge store for this worker. */
-  private final EdgeStore<I, V, E, M> edgeStore;
+  private final EdgeStore<I, V, E> edgeStore;
   /** Message store factory */
   private final
-  MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> messageStoreFactory;
+  MessageStoreFactory<I, Writable, MessageStoreByPartition<I, Writable>>
+  messageStoreFactory;
   /**
    * Message store for incoming messages (messages which will be consumed
    * in the next super step)
    */
-  private volatile MessageStoreByPartition<I, M> incomingMessageStore;
+  private volatile MessageStoreByPartition<I, Writable>
+  incomingMessageStore;
   /**
    * Message store for current messages (messages which we received in
    * previous super step and which will be consumed in current super step)
    */
-  private volatile MessageStoreByPartition<I, M> currentMessageStore;
+  private volatile MessageStoreByPartition<I, Writable>
+  currentMessageStore;
   /**
    * Map of partition ids to incoming vertex mutations from other workers.
    * (Synchronized access to values)
    */
-  private final ConcurrentHashMap<I, VertexMutations<I, V, E, M>>
-  vertexMutations = new ConcurrentHashMap<I, VertexMutations<I, V, E, M>>();
+  private final ConcurrentHashMap<I, VertexMutations<I, V, E>>
+  vertexMutations = new ConcurrentHashMap<I, VertexMutations<I, V, E>>();
   /**
    * Holds aggregtors which current worker owns from current superstep
    */
@@ -89,28 +93,30 @@ public class ServerData<I extends WritableComparable,
    * @param context Mapper context
    */
   public ServerData(
-      CentralizedServiceWorker<I, V, E, M> service,
-      ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
-      MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
+      CentralizedServiceWorker<I, V, E> service,
+      ImmutableClassesGiraphConfiguration<I, V, E> conf,
+      MessageStoreFactory<I, Writable, MessageStoreByPartition<I, Writable>>
           messageStoreFactory,
       Mapper<?, ?, ?, ?>.Context context) {
-
+    this.conf = conf;
     this.messageStoreFactory = messageStoreFactory;
-    currentMessageStore = messageStoreFactory.newStore();
-    incomingMessageStore = messageStoreFactory.newStore();
+    currentMessageStore =
+        messageStoreFactory.newStore(conf.getOutgoingMessageValueClass());
+    incomingMessageStore =
+        messageStoreFactory.newStore(conf.getIncomingMessageValueClass());
     if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) {
       partitionStore =
-          new DiskBackedPartitionStore<I, V, E, M>(conf, context);
+          new DiskBackedPartitionStore<I, V, E>(conf, context);
     } else {
       partitionStore =
-          new SimplePartitionStore<I, V, E, M>(conf, context);
+          new SimplePartitionStore<I, V, E>(conf, context);
     }
-    edgeStore = new EdgeStore<I, V, E, M>(service, conf, context);
+    edgeStore = new EdgeStore<I, V, E>(service, conf, context);
     ownerAggregatorData = new OwnerAggregatorServerData(context, conf);
     allAggregatorData = new AllAggregatorServerData(context, conf);
   }
 
-  public EdgeStore<I, V, E, M> getEdgeStore() {
+  public EdgeStore<I, V, E> getEdgeStore() {
     return edgeStore;
   }
 
@@ -119,7 +125,7 @@ public class ServerData<I extends WritableComparable,
    *
    * @return The partition store
    */
-  public PartitionStore<I, V, E, M> getPartitionStore() {
+  public PartitionStore<I, V, E> getPartitionStore() {
     return partitionStore;
   }
 
@@ -127,20 +133,24 @@ public class ServerData<I extends WritableComparable,
    * Get message store for incoming messages (messages which will be consumed
    * in the next super step)
    *
+   * @param <M> Message data
    * @return Incoming message store
    */
-  public MessageStoreByPartition<I, M> getIncomingMessageStore() {
-    return incomingMessageStore;
+  public <M extends Writable> MessageStoreByPartition<I, M>
+  getIncomingMessageStore() {
+    return (MessageStoreByPartition<I, M>) incomingMessageStore;
   }
 
   /**
    * Get message store for current messages (messages which we received in
    * previous super step and which will be consumed in current super step)
    *
+   * @param <M> Message data
    * @return Current message store
    */
-  public MessageStoreByPartition<I, M> getCurrentMessageStore() {
-    return currentMessageStore;
+  public <M extends Writable> MessageStoreByPartition<I, M>
+  getCurrentMessageStore() {
+    return (MessageStoreByPartition<I, M>) currentMessageStore;
   }
 
   /** Prepare for next super step */
@@ -154,7 +164,8 @@ public class ServerData<I extends WritableComparable,
       }
     }
     currentMessageStore = incomingMessageStore;
-    incomingMessageStore = messageStoreFactory.newStore();
+    incomingMessageStore =
+        messageStoreFactory.newStore(conf.getOutgoingMessageValueClass());
   }
 
   /**
@@ -162,7 +173,7 @@ public class ServerData<I extends WritableComparable,
    *
    * @return Vertex mutations
    */
-  public ConcurrentHashMap<I, VertexMutations<I, V, E, M>>
+  public ConcurrentHashMap<I, VertexMutations<I, V, E>>
   getVertexMutations() {
     return vertexMutations;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java
index ebabf45..3759f6b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java
@@ -33,11 +33,10 @@ import java.io.IOException;
  * @param <I> Vertex id
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public interface WorkerClient<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
+    V extends Writable, E extends Writable> {
 
   /**
    *  Setup the client.

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
index bc0637f..731d0ee 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
@@ -33,10 +33,9 @@ import java.io.IOException;
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message data
  */
 public interface WorkerClientRequestProcessor<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
+    V extends Writable, E extends Writable> {
   /**
    * Sends a message to destination vertex.
    *
@@ -44,7 +43,7 @@ public interface WorkerClientRequestProcessor<I extends WritableComparable,
    * @param message Message to send.
    * @return true if any network I/O occurred.
    */
-  boolean sendMessageRequest(I destVertexId, M message);
+  boolean sendMessageRequest(I destVertexId, Writable message);
 
   /**
    * Sends a vertex to the appropriate partition owner
@@ -53,7 +52,7 @@ public interface WorkerClientRequestProcessor<I extends WritableComparable,
    * @param vertex Vertex to send
    */
   void sendVertexRequest(PartitionOwner partitionOwner,
-                         Vertex<I, V, E, M> vertex);
+                         Vertex<I, V, E> vertex);
 
   /**
    * Send a partition request (no batching).
@@ -62,7 +61,7 @@ public interface WorkerClientRequestProcessor<I extends WritableComparable,
    * @param partition Partition to send
    */
   void sendPartitionRequest(WorkerInfo workerInfo,
-                            Partition<I, V, E, M> partition);
+                            Partition<I, V, E> partition);
 
   /**
    * Sends a request to the appropriate vertex range owner to add an edge
@@ -103,7 +102,7 @@ public interface WorkerClientRequestProcessor<I extends WritableComparable,
    * @param vertex Vertex to be added
    * @throws IOException
    */
-  void addVertexRequest(Vertex<I, V, E, M> vertex) throws IOException;
+  void addVertexRequest(Vertex<I, V, E> vertex) throws IOException;
 
   /**
    * Sends a request to the appropriate vertex range owner to remove a vertex

http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
index e373b2c..bed07b9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
@@ -18,7 +18,6 @@
 
 package org.apache.giraph.comm;
 
-import org.apache.giraph.graph.GraphState;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -31,11 +30,10 @@ import java.net.InetSocketAddress;
  * @param <I> Vertex id
  * @param <V> Vertex value
  * @param <E> Edge value
- * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public interface WorkerServer<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
+    V extends Writable, E extends Writable>
     extends Closeable {
   /**
    * Get server address
@@ -46,17 +44,15 @@ public interface WorkerServer<I extends WritableComparable,
 
   /**
    * Prepare incoming messages for computation, and resolve mutation requests.
-   *
-   * @param graphState Current graph state
    */
-  void prepareSuperstep(GraphState<I, V, E, M> graphState);
+  void prepareSuperstep();
 
   /**
    * Get server data
    *
    * @return Server data
    */
-  ServerData<I, V, E, M> getServerData();
+  ServerData<I, V, E> getServerData();
 
   /**
    * Shuts down.