You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/05/20 19:27:08 UTC
[12/12] git commit: updated refs/heads/trunk to 8811165
GIRAPH-667: Decouple Vertex data and Computation, make Computation and Combiner classes switchable (majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/8811165e
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/8811165e
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/8811165e
Branch: refs/heads/trunk
Commit: 8811165e85ae2db442e34f1021db29f4dfcc8430
Parents: 9f7a347
Author: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Authored: Mon May 20 10:24:19 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Committed: Mon May 20 10:26:04 2013 -0700
----------------------------------------------------------------------
CHANGELOG | 3 +
.../io/accumulo/AccumuloVertexInputFormat.java | 6 +-
.../io/accumulo/TestAccumuloVertexFormat.java | 18 +-
.../edgemarker/AccumuloEdgeInputFormat.java | 4 +-
.../edgemarker/AccumuloEdgeOutputFormat.java | 2 +-
.../giraph/benchmark/AggregatorsBenchmark.java | 14 +-
.../apache/giraph/benchmark/PageRankBenchmark.java | 6 +-
.../giraph/benchmark/PageRankComputation.java | 58 +++
.../apache/giraph/benchmark/PageRankVertex.java | 55 ---
.../giraph/benchmark/RandomMessageBenchmark.java | 23 +-
.../giraph/benchmark/ShortestPathsBenchmark.java | 4 +-
.../giraph/benchmark/ShortestPathsComputation.java | 76 ++++
.../giraph/benchmark/ShortestPathsVertex.java | 65 ----
.../benchmark/WeightedPageRankBenchmark.java | 6 +-
.../benchmark/WeightedPageRankComputation.java | 72 ++++
.../giraph/benchmark/WeightedPageRankVertex.java | 66 ----
.../java/org/apache/giraph/bsp/BspService.java | 22 +-
.../org/apache/giraph/bsp/CentralizedService.java | 3 +-
.../giraph/bsp/CentralizedServiceMaster.java | 5 +-
.../giraph/bsp/CentralizedServiceWorker.java | 20 +-
.../java/org/apache/giraph/comm/SendCache.java | 6 +-
.../java/org/apache/giraph/comm/SendEdgeCache.java | 2 +-
.../org/apache/giraph/comm/SendMessageCache.java | 5 +-
.../org/apache/giraph/comm/SendMutationsCache.java | 37 +-
.../org/apache/giraph/comm/SendPartitionCache.java | 22 +-
.../java/org/apache/giraph/comm/ServerData.java | 63 ++--
.../java/org/apache/giraph/comm/WorkerClient.java | 3 +-
.../giraph/comm/WorkerClientRequestProcessor.java | 11 +-
.../java/org/apache/giraph/comm/WorkerServer.java | 10 +-
.../messages/ByteArrayMessagesPerVertexStore.java | 29 +-
.../comm/messages/DiskBackedMessageStore.java | 25 +-
.../DiskBackedMessageStoreByPartition.java | 30 +-
.../comm/messages/InMemoryMessageStoreFactory.java | 76 ++++
.../giraph/comm/messages/MessageStoreFactory.java | 8 +-
.../comm/messages/OneMessagePerVertexStore.java | 32 +-
.../comm/messages/SequentialFileMessageStore.java | 24 +-
.../giraph/comm/messages/SimpleMessageStore.java | 13 +-
.../giraph/comm/netty/NettyMasterClient.java | 4 +-
.../giraph/comm/netty/NettyMasterServer.java | 2 +-
.../NettyWorkerAggregatorRequestProcessor.java | 8 +-
.../giraph/comm/netty/NettyWorkerClient.java | 15 +-
.../netty/NettyWorkerClientRequestProcessor.java | 84 ++--
.../giraph/comm/netty/NettyWorkerServer.java | 78 ++---
.../netty/handler/WorkerRequestServerHandler.java | 18 +-
.../SendPartitionCurrentMessagesRequest.java | 11 +-
.../requests/SendPartitionMutationsRequest.java | 23 +-
.../giraph/comm/requests/SendVertexRequest.java | 11 +-
.../comm/requests/SendWorkerMessagesRequest.java | 3 +-
.../apache/giraph/comm/requests/WorkerRequest.java | 5 +-
.../giraph/comm/requests/WritableRequest.java | 13 +-
.../java/org/apache/giraph/conf/AllOptions.java | 4 +-
.../DefaultImmutableClassesGiraphConfigurable.java | 11 +-
.../java/org/apache/giraph/conf/GiraphClasses.java | 176 +++++-----
.../apache/giraph/conf/GiraphConfiguration.java | 34 +--
.../org/apache/giraph/conf/GiraphConstants.java | 26 +-
.../conf/ImmutableClassesGiraphConfigurable.java | 9 +-
.../conf/ImmutableClassesGiraphConfiguration.java | 178 +++++-----
.../apache/giraph/edge/ConfigurableOutEdges.java | 4 +-
.../java/org/apache/giraph/edge/EdgeStore.java | 15 +-
.../apache/giraph/edge/MutableEdgesIterable.java | 4 +-
.../apache/giraph/edge/MutableEdgesWrapper.java | 2 +-
.../org/apache/giraph/graph/BasicComputation.java | 35 ++
.../java/org/apache/giraph/graph/Computation.java | 260 +++++++++++++
.../org/apache/giraph/graph/ComputeCallable.java | 80 ++---
.../apache/giraph/graph/DefaultVertexResolver.java | 69 ++---
.../giraph/graph/DefaultVertexValueFactory.java | 2 +-
.../giraph/graph/GiraphTransferRegulator.java | 5 +-
.../java/org/apache/giraph/graph/GraphMapper.java | 7 +-
.../java/org/apache/giraph/graph/GraphState.java | 64 +---
.../org/apache/giraph/graph/GraphStateAware.java | 47 ---
.../org/apache/giraph/graph/GraphTaskManager.java | 110 +++---
.../main/java/org/apache/giraph/graph/Vertex.java | 193 +----------
.../org/apache/giraph/graph/VertexChanges.java | 5 +-
.../org/apache/giraph/graph/VertexMutations.java | 22 +-
.../org/apache/giraph/graph/VertexResolver.java | 17 +-
.../apache/giraph/graph/VertexValueFactory.java | 2 +-
.../SuperstepHashPartitionerFactory.java | 14 +-
.../main/java/org/apache/giraph/io/EdgeReader.java | 2 +-
.../org/apache/giraph/io/GiraphInputFormat.java | 2 +-
.../org/apache/giraph/io/SimpleVertexWriter.java | 2 +-
.../org/apache/giraph/io/VertexOutputFormat.java | 2 +-
.../java/org/apache/giraph/io/VertexReader.java | 4 +-
.../org/apache/giraph/io/VertexValueReader.java | 4 +-
.../java/org/apache/giraph/io/VertexWriter.java | 2 +-
.../giraph/io/filters/DefaultEdgeInputFilter.java | 2 +-
.../io/filters/DefaultVertexInputFilter.java | 6 +-
.../giraph/io/filters/VertexInputFilter.java | 5 +-
.../AdjacencyListTextVertexOutputFormat.java | 2 +-
.../io/formats/IdWithValueTextOutputFormat.java | 2 +-
.../formats/IntIntTextVertexValueInputFormat.java | 6 +-
.../io/formats/JsonBase64VertexOutputFormat.java | 2 +-
...JsonLongDoubleFloatDoubleVertexInputFormat.java | 5 +-
...sonLongDoubleFloatDoubleVertexOutputFormat.java | 3 +-
.../PseudoRandomIntNullVertexInputFormat.java | 4 +-
.../io/formats/PseudoRandomVertexInputFormat.java | 4 +-
.../io/formats/SequenceFileVertexInputFormat.java | 6 +-
.../io/formats/SequenceFileVertexOutputFormat.java | 2 +-
...DoubleDoubleAdjacencyListVertexInputFormat.java | 5 +-
.../giraph/io/formats/TextVertexInputFormat.java | 14 +-
.../giraph/io/formats/TextVertexOutputFormat.java | 2 +-
.../io/formats/TextVertexValueInputFormat.java | 3 +-
.../formats/multi/EdgeInputFormatDescription.java | 12 +-
.../io/formats/multi/MultiEdgeInputFormat.java | 2 +-
.../io/formats/multi/MultiVertexInputFormat.java | 2 +-
.../multi/VertexInputFormatDescription.java | 12 +-
.../giraph/io/internal/WrappedEdgeReader.java | 4 +-
.../io/internal/WrappedVertexOutputFormat.java | 4 +-
.../giraph/io/internal/WrappedVertexReader.java | 6 +-
.../giraph/io/iterables/EdgeReaderWrapper.java | 2 +-
.../giraph/io/iterables/VertexReaderWrapper.java | 12 +-
.../MultiThreadedSuperstepOutput.java | 8 +-
.../io/superstep_output/NoOpSuperstepOutput.java | 2 +-
.../SynchronizedSuperstepOutput.java | 7 +-
.../giraph/job/GiraphConfigurationValidator.java | 52 ++--
.../org/apache/giraph/master/BspServiceMaster.java | 48 ++-
.../giraph/master/MasterAggregatorHandler.java | 4 +-
.../org/apache/giraph/master/MasterCompute.java | 98 +++---
.../org/apache/giraph/master/MasterThread.java | 7 +-
.../org/apache/giraph/master/SuperstepClasses.java | 160 ++++++++
.../apache/giraph/partition/BasicPartition.java | 20 +-
.../giraph/partition/ByteArrayPartition.java | 29 +-
.../giraph/partition/DefaultPartitionContext.java | 34 --
.../giraph/partition/DiskBackedPartitionStore.java | 78 ++--
.../giraph/partition/GraphPartitionerFactory.java | 7 +-
.../giraph/partition/HashMasterPartitioner.java | 5 +-
.../giraph/partition/HashPartitionerFactory.java | 13 +-
.../partition/HashRangePartitionerFactory.java | 15 +-
.../partition/HashRangeWorkerPartitioner.java | 5 +-
.../giraph/partition/HashWorkerPartitioner.java | 9 +-
.../giraph/partition/MasterGraphPartitioner.java | 3 +-
.../org/apache/giraph/partition/Partition.java | 24 +-
.../apache/giraph/partition/PartitionContext.java | 45 ---
.../apache/giraph/partition/PartitionStore.java | 13 +-
.../giraph/partition/RangeMasterPartitioner.java | 5 +-
.../giraph/partition/RangePartitionerFactory.java | 5 +-
.../apache/giraph/partition/RangeSplitHint.java | 2 +-
.../giraph/partition/RangeWorkerPartitioner.java | 5 +-
.../giraph/partition/ReusesObjectsPartition.java | 4 +-
.../SimpleIntRangePartitionerFactory.java | 13 +-
.../SimpleLongRangePartitionerFactory.java | 13 +-
.../apache/giraph/partition/SimplePartition.java | 31 +-
.../giraph/partition/SimplePartitionStore.java | 21 +-
.../partition/SimpleRangeMasterPartitioner.java | 5 +-
.../partition/SimpleRangeWorkerPartitioner.java | 10 +-
.../giraph/partition/WorkerGraphPartitioner.java | 7 +-
.../apache/giraph/utils/ByteArrayVertexIdData.java | 4 +-
.../giraph/utils/ByteArrayVertexIdEdges.java | 4 +-
.../giraph/utils/ByteArrayVertexIdMessages.java | 26 +-
.../apache/giraph/utils/ConfigurationUtils.java | 17 +-
.../giraph/utils/InMemoryVertexInputFormat.java | 6 +-
.../apache/giraph/utils/InternalVertexRunner.java | 14 +-
.../org/apache/giraph/utils/ReflectionUtils.java | 20 +
.../java/org/apache/giraph/utils/TestGraph.java | 32 +-
.../org/apache/giraph/utils/VertexIdIterator.java | 2 +-
.../org/apache/giraph/utils/WritableUtils.java | 109 ++++--
.../org/apache/giraph/worker/BspServiceWorker.java | 125 +++----
.../giraph/worker/EdgeInputSplitsCallable.java | 25 +-
.../worker/EdgeInputSplitsCallableFactory.java | 22 +-
.../apache/giraph/worker/InputSplitsCallable.java | 40 +--
.../giraph/worker/VertexInputSplitsCallable.java | 34 +--
.../worker/VertexInputSplitsCallableFactory.java | 22 +-
.../giraph/worker/WorkerAggregatorHandler.java | 4 +-
.../org/apache/giraph/worker/WorkerContext.java | 16 +-
.../org/apache/giraph/comm/ConnectionTest.java | 17 +-
.../org/apache/giraph/comm/RequestFailureTest.java | 22 +-
.../java/org/apache/giraph/comm/RequestTest.java | 58 ++--
.../org/apache/giraph/comm/SaslConnectionTest.java | 13 +-
.../org/apache/giraph/comm/TestMessageStores.java | 38 +--
.../org/apache/giraph/conf/TestObjectCreation.java | 20 +-
.../apache/giraph/graph/TestVertexAndEdges.java | 37 +-
.../TestAdjacencyListTextVertexOutputFormat.java | 20 +-
.../java/org/apache/giraph/io/TestEdgeInput.java | 29 +-
.../java/org/apache/giraph/io/TestFilters.java | 12 +-
.../giraph/io/TestIdWithValueTextOutputFormat.java | 17 +-
.../org/apache/giraph/io/TestJsonBase64Format.java | 17 +-
...DoubleDoubleAdjacencyListVertexInputFormat.java | 33 +-
...DoubleDoubleAdjacencyListVertexInputFormat.java | 55 +--
.../master/TestComputationCombinerTypes.java | 160 ++++++++
.../apache/giraph/master/TestMasterObserver.java | 55 +++-
.../apache/giraph/master/TestSwitchClasses.java | 268 +++++++++++++
.../partition/TestGiraphTransferRegulator.java | 13 +-
.../giraph/partition/TestPartitionStores.java | 111 +++----
.../apache/giraph/utils/ComputationCountEdges.java | 41 ++
.../giraph/utils/IntIntNullNoOpComputation.java | 30 ++
.../apache/giraph/utils/IntNoOpComputation.java | 29 ++
.../apache/giraph/utils/LongNoOpComputation.java | 29 ++
.../java/org/apache/giraph/utils/MockUtils.java | 87 ++---
.../org/apache/giraph/utils/NoOpComputation.java | 44 +++
.../giraph/vertices/IntIntNullVertexDoNothing.java | 25 --
.../apache/giraph/vertices/VertexCountEdges.java | 33 --
.../apache/giraph/vertices/VertexDoNothing.java | 33 --
.../java/org/apache/giraph/yarn/TestYarnJob.java | 12 +-
.../examples/AggregatorsTestComputation.java | 137 +++++++
.../giraph/examples/AggregatorsTestVertex.java | 134 -------
.../examples/ConnectedComponentsComputation.java | 105 ++++++
.../giraph/examples/ConnectedComponentsVertex.java | 101 -----
.../giraph/examples/IdentityComputation.java | 47 +++
.../org/apache/giraph/examples/IdentityVertex.java | 45 ---
.../examples/LongDoubleDoubleTextInputFormat.java | 13 +-
.../examples/LongDoubleNullTextInputFormat.java | 13 +-
...NormalizingLongDoubleDoubleTextInputFormat.java | 15 +-
.../giraph/examples/PageRankComputation.java | 55 +++
.../org/apache/giraph/examples/PageRankVertex.java | 54 ---
.../examples/PartitionContextTestVertex.java | 115 ------
.../giraph/examples/RandomWalkComputation.java | 173 +++++++++
.../apache/giraph/examples/RandomWalkVertex.java | 163 --------
.../examples/RandomWalkVertexMasterCompute.java | 18 +-
.../examples/RandomWalkWithRestartComputation.java | 85 +++++
.../examples/RandomWalkWithRestartVertex.java | 80 ----
.../giraph/examples/RandomWalkWorkerContext.java | 8 +-
.../apache/giraph/examples/SimpleCheckpoint.java | 292 +++++++++++++++
.../giraph/examples/SimpleCheckpointVertex.java | 286 --------------
.../giraph/examples/SimpleCombinerComputation.java | 70 ++++
.../giraph/examples/SimpleCombinerVertex.java | 65 ----
.../giraph/examples/SimpleFailComputation.java | 74 ++++
.../apache/giraph/examples/SimpleFailVertex.java | 69 ----
.../examples/SimpleInDegreeCountComputation.java | 58 +++
.../giraph/examples/SimpleInDegreeCountVertex.java | 53 ---
...eLongDoubleDoubleDoubleIdentityComputation.java | 32 ++
...SimpleLongDoubleDoubleDoubleIdentityVertex.java | 32 --
.../examples/SimpleMasterComputeComputation.java | 112 ++++++
.../giraph/examples/SimpleMasterComputeVertex.java | 108 ------
.../giraph/examples/SimpleMsgComputation.java | 67 ++++
.../apache/giraph/examples/SimpleMsgVertex.java | 62 ---
.../examples/SimpleMutateGraphComputation.java | 198 ++++++++++
.../giraph/examples/SimpleMutateGraphVertex.java | 197 ----------
.../examples/SimpleOutDegreeCountComputation.java | 46 +++
.../examples/SimpleOutDegreeCountVertex.java | 43 ---
.../giraph/examples/SimplePageRankComputation.java | 250 ++++++++++++
.../giraph/examples/SimplePageRankVertex.java | 248 ------------
.../examples/SimpleShortestPathsComputation.java | 86 +++++
.../giraph/examples/SimpleShortestPathsVertex.java | 81 ----
.../examples/SimpleSuperstepComputation.java | 152 ++++++++
.../giraph/examples/SimpleSuperstepVertex.java | 150 --------
.../examples/SimpleTextVertexOutputFormat.java | 2 +-
.../examples/SimpleTriangleClosingComputation.java | 154 ++++++++
.../examples/SimpleTriangleClosingVertex.java | 151 --------
.../examples/SimpleVertexWithWorkerContext.java | 22 +-
.../examples/TestComputationStateComputation.java | 109 ++++++
.../org/apache/giraph/examples/VerifyMessage.java | 50 ++--
...xWithDoubleValueDoubleEdgeTextOutputFormat.java | 2 +-
...texWithDoubleValueNullEdgeTextOutputFormat.java | 2 +-
.../java/org/apache/giraph/TestAutoCheckpoint.java | 16 +-
.../test/java/org/apache/giraph/TestBspBasic.java | 85 ++---
.../org/apache/giraph/TestComputationState.java | 65 ++++
.../org/apache/giraph/TestGraphPartitioner.java | 54 ++--
.../org/apache/giraph/TestManualCheckpoint.java | 28 +-
.../java/org/apache/giraph/TestMaxSuperstep.java | 18 +-
.../java/org/apache/giraph/TestMutateGraph.java | 10 +-
.../org/apache/giraph/TestNotEnoughMapTasks.java | 10 +-
.../org/apache/giraph/TestPartitionContext.java | 67 ----
.../aggregators/TestAggregatorsHandling.java | 26 +-
.../ConnectedComponentsComputationTest.java | 120 ++++++
...ConnectedComponentsComputationTestInMemory.java | 127 +++++++
.../examples/ConnectedComponentsVertexTest.java | 120 ------
.../ConnectedComponentsVertexTestInMemory.java | 130 -------
.../giraph/examples/PageRankComputationTest.java | 79 ++++
.../apache/giraph/examples/PageRankVertexTest.java | 78 ----
.../RandomWalkWithRestartComputationTest.java | 111 ++++++
.../examples/RandomWalkWithRestartVertexTest.java | 109 ------
.../SimpleShortestPathsComputationTest.java | 164 ++++++++
.../examples/SimpleShortestPathsVertexTest.java | 161 --------
.../SimpleTriangleClosingComputationTest.java | 92 +++++
.../examples/SimpleTriangleClosingVertexTest.java | 93 -----
.../org/apache/giraph/examples/TestPageRank.java | 14 +-
.../examples/TryMultiIpcBindingPortsTest.java | 2 +-
.../apache/giraph/vertex/TestComputationTypes.java | 244 ++++++++++++
.../org/apache/giraph/vertex/TestVertexTypes.java | 248 ------------
.../io/hbase/TestHBaseRootMarkerVertextFormat.java | 14 +-
.../io/hbase/edgemarker/TableEdgeInputFormat.java | 4 +-
.../io/hbase/edgemarker/TableEdgeOutputFormat.java | 2 +-
.../giraph/io/hcatalog/HCatGiraphRunner.java | 34 +-
.../io/hcatalog/HCatalogVertexInputFormat.java | 8 +-
.../io/hcatalog/HCatalogVertexOutputFormat.java | 10 +-
.../org/apache/giraph/hive/HiveGiraphRunner.java | 38 +-
.../DefaultConfigurableAndTableSchemaAware.java | 6 +-
.../giraph/hive/input/edge/AbstractHiveToEdge.java | 4 +-
.../hive/input/edge/HiveEdgeInputFormat.java | 2 +-
.../giraph/hive/input/edge/HiveEdgeReader.java | 2 +-
.../hive/input/vertex/AbstractHiveToVertex.java | 4 +-
.../giraph/hive/input/vertex/HiveToVertex.java | 2 +-
.../hive/input/vertex/HiveVertexInputFormat.java | 2 +-
.../giraph/hive/input/vertex/HiveVertexReader.java | 6 +-
.../hive/input/vertex/SimpleHiveToVertex.java | 8 +-
.../giraph/hive/output/AbstractVertexToHive.java | 2 +-
.../giraph/hive/output/HiveVertexOutputFormat.java | 2 +-
.../giraph/hive/output/HiveVertexWriter.java | 2 +-
.../giraph/hive/output/SimpleVertexToHive.java | 4 +-
.../apache/giraph/hive/output/VertexToHive.java | 2 +-
.../output/examples/HiveOutputIntIntVertex.java | 2 +-
.../hive/computations/ComputationCountEdges.java | 36 ++
.../hive/computations/ComputationSumEdges.java | 42 ++
.../giraph/hive/computations/package-info.java | 22 ++
.../giraph/hive/input/HiveEdgeInputTest.java | 10 +-
.../giraph/hive/input/HiveVertexInputTest.java | 12 +-
.../apache/giraph/hive/output/HiveOutputTest.java | 4 +-
.../giraph/hive/vertexes/VertexCountEdges.java | 34 --
.../giraph/hive/vertexes/VertexSumEdges.java | 39 --
.../apache/giraph/hive/vertexes/package-info.java | 22 --
299 files changed, 6685 insertions(+), 6079 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 52f8099..c20ded9 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-667: Decouple Vertex data and Computation, make Computation
+ and Combiner classes switchable (majakabiljo)
+
GIRAPH-608: Spelling error in Combiner.java (Michael Aro via aching)
GIRAPH-666: Netty execs threads and metrics threads don't get
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java b/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
index 221fabd..c286ed4 100644
--- a/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
+++ b/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java
@@ -69,7 +69,7 @@ public abstract class AccumuloVertexInputFormat<
extends VertexReader<I, V, E> {
/** Giraph configuration */
- private ImmutableClassesGiraphConfiguration<I, V, E, Writable>
+ private ImmutableClassesGiraphConfiguration<I, V, E>
configuration;
/**
* Used by subclasses to read key/value pairs.
@@ -86,7 +86,7 @@ public abstract class AccumuloVertexInputFormat<
this.reader = reader;
}
- public ImmutableClassesGiraphConfiguration<I, V, E, Writable>
+ public ImmutableClassesGiraphConfiguration<I, V, E>
getConfiguration() {
return configuration;
}
@@ -105,7 +105,7 @@ public abstract class AccumuloVertexInputFormat<
reader.initialize(inputSplit, context);
this.context = context;
this.configuration =
- new ImmutableClassesGiraphConfiguration<I, V, E, Writable>(
+ new ImmutableClassesGiraphConfiguration<I, V, E>(
context.getConfiguration());
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java
----------------------------------------------------------------------
diff --git a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java
index 6698c9a..0ee9666 100644
--- a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java
+++ b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.ByteBufferUtil;
import org.apache.accumulo.core.util.Pair;
import org.apache.giraph.BspCase;
+import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.io.accumulo.edgemarker.AccumuloEdgeInputFormat;
import org.apache.giraph.io.accumulo.edgemarker.AccumuloEdgeOutputFormat;
@@ -133,7 +134,7 @@ public class TestAccumuloVertexFormat extends BspCase{
GiraphJob job = new GiraphJob(conf, getCallingMethodName());
setupConfiguration(job);
GiraphConfiguration giraphConf = job.getConfiguration();
- giraphConf.setVertexClass(EdgeNotification.class);
+ giraphConf.setComputationClass(EdgeNotification.class);
giraphConf.setVertexInputFormatClass(AccumuloEdgeInputFormat.class);
giraphConf.setVertexOutputFormatClass(AccumuloEdgeOutputFormat.class);
@@ -170,16 +171,17 @@ public class TestAccumuloVertexFormat extends BspCase{
The test set only has a 1-1 parent-to-child ratio for this unit test.
*/
public static class EdgeNotification
- extends Vertex<Text, Text, Text, Text> {
- @Override
- public void compute(Iterable<Text> messages) throws IOException {
+ extends BasicComputation<Text, Text, Text, Text> {
+ @Override
+ public void compute(Vertex<Text, Text, Text> vertex,
+ Iterable<Text> messages) throws IOException {
for (Text message : messages) {
- getValue().set(message);
+ vertex.getValue().set(message);
}
if(getSuperstep() == 0) {
- sendMessageToAllEdges(getId());
+ sendMessageToAllEdges(vertex, vertex.getId());
}
- voteToHalt();
- }
+ vertex.voteToHalt();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
index 5855cfc..108ae61 100644
--- a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
+++ b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
@@ -73,11 +73,11 @@ public class AccumuloEdgeInputFormat
/*
Each Key/Value contains the information needed to construct the vertices.
*/
- public Vertex<Text, Text, Text, ?> getCurrentVertex()
+ public Vertex<Text, Text, Text> getCurrentVertex()
throws IOException, InterruptedException {
Key key = getRecordReader().getCurrentKey();
Value value = getRecordReader().getCurrentValue();
- Vertex<Text, Text, Text, ?> vertex =
+ Vertex<Text, Text, Text> vertex =
getConfiguration().createVertex();
Text vertexId = key.getRow();
List<Edge<Text, Text>> edges = Lists.newLinkedList();
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeOutputFormat.java b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeOutputFormat.java
index f999497..c2ebbe2 100644
--- a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeOutputFormat.java
+++ b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeOutputFormat.java
@@ -64,7 +64,7 @@ public class AccumuloEdgeOutputFormat
Write back a mutation that adds a qualifier for 'parent' containing the vertex value
as the cell value. Assume the vertex ID corresponds to a key.
*/
- public void writeVertex(Vertex<Text, Text, Text, ?> vertex)
+ public void writeVertex(Vertex<Text, Text, Text> vertex)
throws IOException, InterruptedException {
RecordWriter<Text, Mutation> writer = getRecordWriter();
Mutation mt = new Mutation(vertex.getId());
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
index 12304bb..539bd7d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
@@ -20,6 +20,7 @@ package org.apache.giraph.benchmark;
import org.apache.commons.cli.CommandLine;
import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
@@ -52,10 +53,13 @@ public class AggregatorsBenchmark extends GiraphBenchmark {
/**
* Vertex class for AggregatorsBenchmark
*/
- public static class AggregatorsBenchmarkVertex extends
- Vertex<LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> {
+ public static class AggregatorsBenchmarkComputation extends
+ BasicComputation<LongWritable, DoubleWritable, DoubleWritable,
+ DoubleWritable> {
@Override
- public void compute(Iterable<DoubleWritable> messages) throws IOException {
+ public void compute(
+ Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
+ Iterable<DoubleWritable> messages) throws IOException {
int n = getNumAggregators(getConf());
long superstep = getSuperstep();
int w = getWorkerContextAggregated(getConf(), superstep);
@@ -71,7 +75,7 @@ public class AggregatorsBenchmark extends GiraphBenchmark {
((LongWritable) getAggregatedValue("p" + i)).get());
}
if (superstep > 2) {
- voteToHalt();
+ vertex.voteToHalt();
}
}
}
@@ -201,7 +205,7 @@ public class AggregatorsBenchmark extends GiraphBenchmark {
@Override
protected void prepareConfiguration(GiraphConfiguration conf,
CommandLine cmd) {
- conf.setVertexClass(AggregatorsBenchmarkVertex.class);
+ conf.setComputationClass(AggregatorsBenchmarkComputation.class);
conf.setMasterComputeClass(AggregatorsBenchmarkMasterCompute.class);
conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
conf.setWorkerContextClass(AggregatorsBenchmarkWorkerContext.class);
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
index 0f8d284..bd2939e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
@@ -31,7 +31,7 @@ import com.google.common.collect.Sets;
import java.util.Set;
/**
- * Benchmark for {@link PageRankVertex}
+ * Benchmark for {@link PageRankComputation}
*/
public class PageRankBenchmark extends GiraphBenchmark {
@Override
@@ -44,7 +44,7 @@ public class PageRankBenchmark extends GiraphBenchmark {
@Override
protected void prepareConfiguration(GiraphConfiguration conf,
CommandLine cmd) {
- conf.setVertexClass(PageRankVertex.class);
+ conf.setComputationClass(PageRankComputation.class);
conf.setOutEdgesClass(IntNullArrayEdges.class);
conf.setCombinerClass(FloatSumCombiner.class);
conf.setVertexInputFormatClass(
@@ -54,7 +54,7 @@ public class PageRankBenchmark extends GiraphBenchmark {
BenchmarkOption.VERTICES.getOptionIntValue(cmd));
conf.setInt(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX,
BenchmarkOption.EDGES_PER_VERTEX.getOptionIntValue(cmd));
- conf.setInt(PageRankVertex.SUPERSTEP_COUNT,
+ conf.setInt(PageRankComputation.SUPERSTEP_COUNT,
BenchmarkOption.SUPERSTEPS.getOptionIntValue(cmd));
conf.setFloat(PseudoRandomInputFormatConstants.LOCAL_EDGES_MIN_RATIO,
BenchmarkOption.LOCAL_EDGES_MIN_RATIO.getOptionFloatValue(cmd,
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankComputation.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankComputation.java
new file mode 100644
index 0000000..e891ff2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankComputation.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.benchmark;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import java.io.IOException;
+
+/**
+ * Implementation of PageRank in which vertex ids are ints, page rank values
+ * are floats, and graph is unweighted.
+ */
+public class PageRankComputation extends BasicComputation<IntWritable,
+ FloatWritable, NullWritable, FloatWritable> {
+ /** Number of supersteps */
+ public static final String SUPERSTEP_COUNT =
+ "giraph.pageRank.superstepCount";
+
+ @Override
+ public void compute(
+ Vertex<IntWritable, FloatWritable, NullWritable> vertex,
+ Iterable<FloatWritable> messages) throws IOException {
+ if (getSuperstep() >= 1) {
+ float sum = 0;
+ for (FloatWritable message : messages) {
+ sum += message.get();
+ }
+ vertex.getValue().set((0.15f / getTotalNumVertices()) + 0.85f * sum);
+ }
+
+ if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, 0)) {
+ sendMessageToAllEdges(vertex,
+ new FloatWritable(vertex.getValue().get() / vertex.getNumEdges()));
+ } else {
+ vertex.voteToHalt();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankVertex.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankVertex.java
deleted file mode 100644
index 9900a44..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankVertex.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.benchmark;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import java.io.IOException;
-
-/**
- * Implementation of PageRank in which vertex ids are ints, page rank values
- * are floats, and graph is unweighted.
- */
-public class PageRankVertex extends Vertex<IntWritable, FloatWritable,
- NullWritable, FloatWritable> {
- /** Number of supersteps */
- public static final String SUPERSTEP_COUNT =
- "giraph.pageRank.superstepCount";
-
- @Override
- public void compute(Iterable<FloatWritable> messages) throws IOException {
- if (getSuperstep() >= 1) {
- float sum = 0;
- for (FloatWritable message : messages) {
- sum += message.get();
- }
- getValue().set((0.15f / getTotalNumVertices()) + 0.85f * sum);
- }
-
- if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, 0)) {
- sendMessageToAllEdges(
- new FloatWritable(getValue().get() / getNumEdges()));
- } else {
- voteToHalt();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
index 5c7e019..cf2e6eb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
@@ -20,6 +20,7 @@ package org.apache.giraph.benchmark;
import org.apache.commons.cli.CommandLine;
import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
@@ -35,6 +36,7 @@ import org.apache.log4j.Logger;
import com.google.common.collect.Sets;
+import java.io.IOException;
import java.util.Random;
import java.util.Set;
@@ -257,25 +259,26 @@ public class RandomMessageBenchmark extends GiraphBenchmark {
/**
* Actual message computation (messaging in this case)
*/
- public static class RandomMessageVertex extends Vertex<LongWritable,
- DoubleWritable, DoubleWritable, BytesWritable> {
+ public static class RandomMessageComputation extends BasicComputation<
+ LongWritable, DoubleWritable, DoubleWritable, BytesWritable> {
@Override
- public void compute(Iterable<BytesWritable> messages) {
- RandomMessageBenchmarkWorkerContext workerContext =
- (RandomMessageBenchmarkWorkerContext) getWorkerContext();
+ public void compute(
+ Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
+ Iterable<BytesWritable> messages) throws IOException {
+ RandomMessageBenchmarkWorkerContext workerContext = getWorkerContext();
if (getSuperstep() < workerContext.getNumSupersteps()) {
for (int i = 0; i < workerContext.getNumMessagePerEdge(); i++) {
workerContext.randomizeMessageBytes();
- sendMessageToAllEdges(
+ sendMessageToAllEdges(vertex,
new BytesWritable(workerContext.getMessageBytes()));
long bytesSent = workerContext.getMessageBytes().length *
- getNumEdges();
+ vertex.getNumEdges();
aggregate(AGG_SUPERSTEP_TOTAL_BYTES, new LongWritable(bytesSent));
aggregate(AGG_SUPERSTEP_TOTAL_MESSAGES,
- new LongWritable(getNumEdges()));
+ new LongWritable(vertex.getNumEdges()));
}
} else {
- voteToHalt();
+ vertex.voteToHalt();
}
}
}
@@ -290,7 +293,7 @@ public class RandomMessageBenchmark extends GiraphBenchmark {
@Override
protected void prepareConfiguration(GiraphConfiguration conf,
CommandLine cmd) {
- conf.setVertexClass(RandomMessageVertex.class);
+ conf.setComputationClass(RandomMessageComputation.class);
conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
conf.setWorkerContextClass(RandomMessageBenchmarkWorkerContext.class);
conf.setMasterComputeClass(RandomMessageBenchmarkMasterCompute.class);
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
index 8e6c877..0dd4529 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsBenchmark.java
@@ -58,13 +58,13 @@ public class ShortestPathsBenchmark extends GiraphBenchmark {
@Override
protected void prepareConfiguration(GiraphConfiguration conf,
CommandLine cmd) {
- conf.setVertexClass(ShortestPathsVertex.class);
+ conf.setComputationClass(ShortestPathsComputation.class);
if (EDGES_CLASS.getOptionIntValue(cmd, 1) == 1) {
conf.setOutEdgesClass(ArrayListEdges.class);
} else {
conf.setOutEdgesClass(HashMapEdges.class);
}
- LOG.info("Using class " + GiraphConstants.VERTEX_CLASS.get(conf));
+ LOG.info("Using class " + GiraphConstants.COMPUTATION_CLASS.get(conf));
conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
if (!NO_COMBINER.optionTurnedOn(cmd)) {
conf.setCombinerClass(MinimumDoubleCombiner.class);
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsComputation.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsComputation.java
new file mode 100644
index 0000000..ad0600c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsComputation.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.benchmark;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.IOException;
+
+/**
+ * Shortest paths algorithm.
+ */
+public class ShortestPathsComputation extends BasicComputation<LongWritable,
+ DoubleWritable, DoubleWritable, DoubleWritable> {
+ /** Source id. */
+ public static final String SOURCE_ID =
+ "giraph.shortestPathsBenchmark.sourceId";
+ /** Default source id. */
+ public static final long SOURCE_ID_DEFAULT = 1;
+
+ /**
+ * Check if vertex is source from which to calculate shortest paths.
+ *
+ * @param vertex Vertex
+ * @return True iff vertex is source for shortest paths
+ */
+ private boolean isSource(
+ Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex) {
+ return vertex.getId().get() ==
+ getConf().getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
+ }
+
+ @Override
+ public void compute(
+ Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
+ Iterable<DoubleWritable> messages) throws IOException {
+ if (getSuperstep() == 0) {
+ vertex.setValue(new DoubleWritable(Double.MAX_VALUE));
+ }
+
+ double minDist = isSource(vertex) ? 0d : Double.MAX_VALUE;
+ for (DoubleWritable message : messages) {
+ minDist = Math.min(minDist, message.get());
+ }
+
+ if (minDist < vertex.getValue().get()) {
+ vertex.setValue(new DoubleWritable(minDist));
+ for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
+ double distance = minDist + edge.getValue().get();
+ sendMessage(edge.getTargetVertexId(),
+ new DoubleWritable(distance));
+ }
+ }
+
+ vertex.voteToHalt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsVertex.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsVertex.java
deleted file mode 100644
index c1b77d1..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/ShortestPathsVertex.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.benchmark;
-
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-
-import java.io.IOException;
-
-/**
- * Shortest paths algorithm.
- */
-public class ShortestPathsVertex extends Vertex<LongWritable, DoubleWritable,
- DoubleWritable, DoubleWritable> {
- /** Source id. */
- public static final String SOURCE_ID =
- "giraph.shortestPathsBenchmark.sourceId";
- /** Default source id. */
- public static final long SOURCE_ID_DEFAULT = 1;
-
- private boolean isSource() {
- return getId().get() == getConf().getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
- }
-
- @Override
- public void compute(Iterable<DoubleWritable> messages) throws IOException {
- if (getSuperstep() == 0) {
- setValue(new DoubleWritable(Double.MAX_VALUE));
- }
-
- double minDist = isSource() ? 0d : Double.MAX_VALUE;
- for (DoubleWritable message : messages) {
- minDist = Math.min(minDist, message.get());
- }
-
- if (minDist < getValue().get()) {
- setValue(new DoubleWritable(minDist));
- for (Edge<LongWritable, DoubleWritable> edge : getEdges()) {
- double distance = minDist + edge.getValue().get();
- sendMessage(edge.getTargetVertexId(),
- new DoubleWritable(distance));
- }
- }
-
- voteToHalt();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java
index 3fc514a..2077674 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankBenchmark.java
@@ -38,7 +38,7 @@ import com.google.common.collect.Sets;
import java.util.Set;
/**
- * Benchmark for {@link WeightedPageRankVertex}
+ * Benchmark for {@link WeightedPageRankComputation}
*/
public class WeightedPageRankBenchmark extends GiraphBenchmark {
/** Class logger */
@@ -88,7 +88,7 @@ public class WeightedPageRankBenchmark extends GiraphBenchmark {
*/
protected void prepareConfiguration(GiraphConfiguration configuration,
CommandLine cmd) {
- configuration.setVertexClass(WeightedPageRankVertex.class);
+ configuration.setComputationClass(WeightedPageRankComputation.class);
int edgesClassOption = EDGES_CLASS.getOptionIntValue(cmd, 1);
switch (edgesClassOption) {
case 0:
@@ -149,7 +149,7 @@ public class WeightedPageRankBenchmark extends GiraphBenchmark {
SimpleLongRangePartitionerFactory.class);
}
- configuration.setInt(WeightedPageRankVertex.SUPERSTEP_COUNT,
+ configuration.setInt(WeightedPageRankComputation.SUPERSTEP_COUNT,
BenchmarkOption.SUPERSTEPS.getOptionIntValue(cmd));
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankComputation.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankComputation.java
new file mode 100644
index 0000000..18182ed
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankComputation.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.benchmark;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.MutableEdge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.IOException;
+
+/**
+ * Implementation of Page Rank algorithm on a weighted graph.
+ */
+public class WeightedPageRankComputation extends BasicComputation<LongWritable,
+ DoubleWritable, DoubleWritable, DoubleWritable> {
+ /** Number of supersteps */
+ public static final String SUPERSTEP_COUNT =
+ "giraph.weightedPageRank.superstepCount";
+
+ @Override
+ public void compute(
+ Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
+ Iterable<DoubleWritable> messages) throws IOException {
+ if (getSuperstep() == 0) {
+ // Normalize out edge weights
+ double outEdgeSum = 0;
+ for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
+ outEdgeSum += edge.getValue().get();
+ }
+ for (MutableEdge<LongWritable, DoubleWritable> edge :
+ vertex.getMutableEdges()) {
+ edge.setValue(new DoubleWritable(edge.getValue().get() / outEdgeSum));
+ }
+ } else {
+ double messageSum = 0;
+ for (DoubleWritable message : messages) {
+ messageSum += message.get();
+ }
+ vertex.getValue().set(
+ (0.15f / getTotalNumVertices()) + 0.85f * messageSum);
+ }
+
+ if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, 0)) {
+ for (Edge<LongWritable, DoubleWritable> edge : vertex.getEdges()) {
+ sendMessage(edge.getTargetVertexId(),
+ new DoubleWritable(
+ vertex.getValue().get() * edge.getValue().get()));
+ }
+ } else {
+ vertex.voteToHalt();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankVertex.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankVertex.java
deleted file mode 100644
index 70f0f61..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/WeightedPageRankVertex.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.benchmark;
-
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.edge.MutableEdge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-
-import java.io.IOException;
-
-/**
- * Implementation of Page Rank algorithm on a weighted graph.
- */
-public class WeightedPageRankVertex extends Vertex<LongWritable, DoubleWritable,
- DoubleWritable, DoubleWritable> {
- /** Number of supersteps */
- public static final String SUPERSTEP_COUNT =
- "giraph.weightedPageRank.superstepCount";
-
- @Override
- public void compute(Iterable<DoubleWritable> messages) throws IOException {
- if (getSuperstep() == 0) {
- // Normalize out edge weights
- double outEdgeSum = 0;
- for (Edge<LongWritable, DoubleWritable> edge : getEdges()) {
- outEdgeSum += edge.getValue().get();
- }
- for (MutableEdge<LongWritable, DoubleWritable> edge : getMutableEdges()) {
- edge.setValue(new DoubleWritable(edge.getValue().get() / outEdgeSum));
- }
- } else {
- double messageSum = 0;
- for (DoubleWritable message : messages) {
- messageSum += message.get();
- }
- getValue().set((0.15f / getTotalNumVertices()) + 0.85f * messageSum);
- }
-
- if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, 0)) {
- for (Edge<LongWritable, DoubleWritable> edge : getEdges()) {
- sendMessage(edge.getTargetVertexId(),
- new DoubleWritable(getValue().get() * edge.getValue().get()));
- }
- } else {
- voteToHalt();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index 178c96f..ff3f06d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -60,12 +60,11 @@ import static org.apache.giraph.conf.GiraphConstants.CHECKPOINT_DIRECTORY;
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
- * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public abstract class BspService<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements Watcher, CentralizedService<I, V, E, M> {
+ V extends Writable, E extends Writable>
+ implements Watcher, CentralizedService<I, V, E> {
/** Unset superstep */
public static final long UNSET_SUPERSTEP = Long.MIN_VALUE;
/** Input superstep (superstep when loading the vertices happens) */
@@ -218,7 +217,7 @@ public abstract class BspService<I extends WritableComparable,
private final List<BspEvent> registeredBspEvents =
new ArrayList<BspEvent>();
/** Immutable configuration of the job*/
- private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+ private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
/** Job context (mainly for progress) */
private final Mapper<?, ?, ?, ?>.Context context;
/** Cached superstep (from ZooKeeper) */
@@ -236,9 +235,9 @@ public abstract class BspService<I extends WritableComparable,
/** Combination of hostname '_' partition (unique id) */
private final String hostnamePartitionId;
/** Graph partitioner */
- private final GraphPartitionerFactory<I, V, E, M> graphPartitionerFactory;
+ private final GraphPartitionerFactory<I, V, E> graphPartitionerFactory;
/** Mapper that will do the graph computation */
- private final GraphTaskManager<I, V, E, M> graphTaskManager;
+ private final GraphTaskManager<I, V, E> graphTaskManager;
/** File system */
private final FileSystem fs;
/** Checkpoint frequency */
@@ -255,7 +254,7 @@ public abstract class BspService<I extends WritableComparable,
public BspService(String serverPortList,
int sessionMsecTimeout,
Mapper<?, ?, ?, ?>.Context context,
- GraphTaskManager<I, V, E, M> graphTaskManager) {
+ GraphTaskManager<I, V, E> graphTaskManager) {
this.vertexInputSplitsEvents = new InputSplitEvents(context);
this.edgeInputSplitsEvents = new InputSplitEvents(context);
this.connectedEvent = new PredicateLock(context);
@@ -280,8 +279,7 @@ public abstract class BspService<I extends WritableComparable,
this.context = context;
this.graphTaskManager = graphTaskManager;
- this.conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
- context.getConfiguration());
+ this.conf = graphTaskManager.getConf();
this.jobId = conf.get("mapred.job.id", "Unknown Job");
this.taskPartition = conf.getTaskPartition();
this.restartedSuperstep = conf.getLong(
@@ -605,7 +603,7 @@ public abstract class BspService<I extends WritableComparable,
return fs;
}
- public final ImmutableClassesGiraphConfiguration<I, V, E, M>
+ public final ImmutableClassesGiraphConfiguration<I, V, E>
getConfiguration() {
return conf;
}
@@ -626,7 +624,7 @@ public abstract class BspService<I extends WritableComparable,
return taskPartition;
}
- public final GraphTaskManager<I, V, E, M> getGraphTaskManager() {
+ public final GraphTaskManager<I, V, E> getGraphTaskManager() {
return graphTaskManager;
}
@@ -885,7 +883,7 @@ public abstract class BspService<I extends WritableComparable,
*
* @return Instantiated graph partitioner factory
*/
- protected GraphPartitionerFactory<I, V, E, M> getGraphPartitionerFactory() {
+ protected GraphPartitionerFactory<I, V, E> getGraphPartitionerFactory() {
return graphPartitionerFactory;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
index 2281903..ff3e427 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
@@ -30,11 +30,10 @@ import org.apache.hadoop.io.WritableComparable;
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public interface CentralizedService<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
+ V extends Writable, E extends Writable> {
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
index 5f84ece..fb98b00 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
@@ -33,12 +33,11 @@ import java.io.IOException;
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public interface CentralizedServiceMaster<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> extends
- CentralizedService<I, V, E, M> {
+ V extends Writable, E extends Writable> extends
+ CentralizedService<I, V, E> {
/**
* Setup (must be called prior to any other function)
*/
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
index 1c7bde4..4b0f985 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
@@ -21,7 +21,6 @@ package org.apache.giraph.bsp;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.WorkerClient;
import org.apache.giraph.graph.FinishedSuperstepStats;
-import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.GraphTaskManager;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.superstep_output.SuperstepOutput;
@@ -47,12 +46,11 @@ import java.util.List;
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public interface CentralizedServiceWorker<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends CentralizedService<I, V, E, M> {
+ V extends Writable, E extends Writable>
+ extends CentralizedService<I, V, E> {
/**
* Setup (must be called prior to any other function)
*
@@ -73,7 +71,7 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
*
* @return Worker client
*/
- WorkerClient<I, V, E, M> getWorkerClient();
+ WorkerClient<I, V, E> getWorkerClient();
/**
* Get the worker context.
@@ -97,7 +95,7 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
*
* @return The partition store for this worker.
*/
- PartitionStore<I, V, E, M> getPartitionStore();
+ PartitionStore<I, V, E> getPartitionStore();
/**
* Both the vertices and the messages need to be checkpointed in order
@@ -121,23 +119,19 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
* Take all steps prior to actually beginning the computation of a
* superstep.
*
- * @param graphState Current graph state
* @return Collection of all the partition owners from the master for this
* superstep.
*/
- Collection<? extends PartitionOwner> startSuperstep(
- GraphState<I, V, E, M> graphState);
+ Collection<? extends PartitionOwner> startSuperstep();
/**
* Worker is done with its portion of the superstep. Report the
* worker level statistics after the computation.
*
- * @param graphState Current graph state
* @param partitionStatsList All the partition stats for this worker
* @return Stats of the superstep completion
*/
FinishedSuperstepStats finishSuperstep(
- GraphState<I, V, E, M> graphState,
List<PartitionStats> partitionStatsList);
/**
@@ -195,7 +189,7 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
*
* @return the GraphTaskManager instance for this compute node
*/
- GraphTaskManager<I, V, E, M> getGraphTaskManager();
+ GraphTaskManager<I, V, E> getGraphTaskManager();
/**
* Operations that will be called if there is a failure by a worker.
@@ -207,7 +201,7 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
*
* @return Server data
*/
- ServerData<I, V, E, M> getServerData();
+ ServerData<I, V, E> getServerData();
/**
* Get worker aggregator handler
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java
index 1e8bdf9..92d0926 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendCache.java
@@ -64,7 +64,7 @@ public abstract class SendCache<I extends WritableComparable, T,
* ratio of the average request size)
*/
public SendCache(ImmutableClassesGiraphConfiguration conf,
- CentralizedServiceWorker<?, ?, ?, ?> serviceWorker,
+ CentralizedServiceWorker<?, ?, ?> serviceWorker,
int maxRequestSize,
float additionalRequestSize) {
this.conf = conf;
@@ -176,4 +176,8 @@ public abstract class SendCache<I extends WritableComparable, T,
}
return allData;
}
+
+ public ImmutableClassesGiraphConfiguration getConf() {
+ return conf;
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
index fbc911f..5513da2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
@@ -46,7 +46,7 @@ public class SendEdgeCache<I extends WritableComparable, E extends Writable>
* @param serviceWorker Service worker
*/
public SendEdgeCache(ImmutableClassesGiraphConfiguration conf,
- CentralizedServiceWorker<?, ?, ?, ?> serviceWorker) {
+ CentralizedServiceWorker<?, ?, ?> serviceWorker) {
super(conf, serviceWorker, MAX_EDGE_REQUEST_SIZE.get(conf),
ADDITIONAL_EDGE_REQUEST_SIZE.get(conf));
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
index 7d2a888..40023c2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
@@ -45,14 +45,15 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
* @param serviceWorker Service worker
*/
public SendMessageCache(ImmutableClassesGiraphConfiguration conf,
- CentralizedServiceWorker<?, ?, ?, ?> serviceWorker) {
+ CentralizedServiceWorker<?, ?, ?> serviceWorker) {
super(conf, serviceWorker, MAX_MSG_REQUEST_SIZE.get(conf),
ADDITIONAL_MSG_REQUEST_SIZE.get(conf));
}
@Override
public ByteArrayVertexIdMessages<I, M> createByteArrayVertexIdData() {
- return new ByteArrayVertexIdMessages<I, M>();
+ return new ByteArrayVertexIdMessages<I, M>(
+ getConf().getOutgoingMessageValueClass());
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/SendMutationsCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMutationsCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMutationsCache.java
index 67f74f1..9348e61 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMutationsCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendMutationsCache.java
@@ -33,14 +33,13 @@ import java.util.Map;
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
- * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public class SendMutationsCache<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
+ V extends Writable, E extends Writable> {
/** Internal cache */
- private Map<Integer, Map<I, VertexMutations<I, V, E, M>>> mutationCache =
- new HashMap<Integer, Map<I, VertexMutations<I, V, E, M>>>();
+ private Map<Integer, Map<I, VertexMutations<I, V, E>>> mutationCache =
+ new HashMap<Integer, Map<I, VertexMutations<I, V, E>>>();
/** Number of mutations in each partition */
private final Map<Integer, Integer> mutationCountMap =
new HashMap<Integer, Integer>();
@@ -53,17 +52,17 @@ public class SendMutationsCache<I extends WritableComparable,
* @param destVertexId Destination vertex id
* @return Mutations for the vertex
*/
- private VertexMutations<I, V, E, M> getVertexMutations(
+ private VertexMutations<I, V, E> getVertexMutations(
Integer partitionId, I destVertexId) {
- Map<I, VertexMutations<I, V, E, M>> idMutations =
+ Map<I, VertexMutations<I, V, E>> idMutations =
mutationCache.get(partitionId);
if (idMutations == null) {
- idMutations = new HashMap<I, VertexMutations<I, V, E, M>>();
+ idMutations = new HashMap<I, VertexMutations<I, V, E>>();
mutationCache.put(partitionId, idMutations);
}
- VertexMutations<I, V, E, M> mutations = idMutations.get(destVertexId);
+ VertexMutations<I, V, E> mutations = idMutations.get(destVertexId);
if (mutations == null) {
- mutations = new VertexMutations<I, V, E, M>();
+ mutations = new VertexMutations<I, V, E>();
idMutations.put(destVertexId, mutations);
}
return mutations;
@@ -97,7 +96,7 @@ public class SendMutationsCache<I extends WritableComparable,
public int addEdgeMutation(
Integer partitionId, I destVertexId, Edge<I, E> edge) {
// Get the mutations for this partition
- VertexMutations<I, V, E, M> mutations =
+ VertexMutations<I, V, E> mutations =
getVertexMutations(partitionId, destVertexId);
// Add the edge
@@ -118,7 +117,7 @@ public class SendMutationsCache<I extends WritableComparable,
public int removeEdgeMutation(
Integer partitionId, I vertexIndex, I destinationVertexIndex) {
// Get the mutations for this partition
- VertexMutations<I, V, E, M> mutations =
+ VertexMutations<I, V, E> mutations =
getVertexMutations(partitionId, vertexIndex);
// Remove the edge
@@ -136,9 +135,9 @@ public class SendMutationsCache<I extends WritableComparable,
* @return Number of mutations in the partition.
*/
public int addVertexMutation(
- Integer partitionId, Vertex<I, V, E, M> vertex) {
+ Integer partitionId, Vertex<I, V, E> vertex) {
// Get the mutations for this partition
- VertexMutations<I, V, E, M> mutations =
+ VertexMutations<I, V, E> mutations =
getVertexMutations(partitionId, vertex.getId());
// Add the vertex
@@ -158,7 +157,7 @@ public class SendMutationsCache<I extends WritableComparable,
public int removeVertexMutation(
Integer partitionId, I destVertexId) {
// Get the mutations for this partition
- VertexMutations<I, V, E, M> mutations =
+ VertexMutations<I, V, E> mutations =
getVertexMutations(partitionId, destVertexId);
// Remove the vertex
@@ -174,9 +173,9 @@ public class SendMutationsCache<I extends WritableComparable,
* @param partitionId Partition id
* @return Removed partition mutations
*/
- public Map<I, VertexMutations<I, V, E, M>> removePartitionMutations(
+ public Map<I, VertexMutations<I, V, E>> removePartitionMutations(
int partitionId) {
- Map<I, VertexMutations<I, V, E, M>> idMutations =
+ Map<I, VertexMutations<I, V, E>> idMutations =
mutationCache.remove(partitionId);
mutationCountMap.put(partitionId, 0);
return idMutations;
@@ -187,12 +186,12 @@ public class SendMutationsCache<I extends WritableComparable,
*
* @return All vertex mutations for all partitions
*/
- public Map<Integer, Map<I, VertexMutations<I, V, E, M>>>
+ public Map<Integer, Map<I, VertexMutations<I, V, E>>>
removeAllPartitionMutations() {
- Map<Integer, Map<I, VertexMutations<I, V, E, M>>> allMutations =
+ Map<Integer, Map<I, VertexMutations<I, V, E>>> allMutations =
mutationCache;
mutationCache =
- new HashMap<Integer, Map<I, VertexMutations<I, V, E, M>>>();
+ new HashMap<Integer, Map<I, VertexMutations<I, V, E>>>();
mutationCountMap.clear();
return allMutations;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendPartitionCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
index 31cf052..68ce095 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
@@ -36,23 +36,19 @@ import org.apache.log4j.Logger;
* @param <I> Vertex index value
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message data
*/
public class SendPartitionCache<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
+ V extends Writable, E extends Writable> {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(SendPartitionCache.class);
/** Input split vertex cache (only used when loading from input split) */
- private final Map<PartitionOwner, Partition<I, V, E, M>>
+ private final Map<PartitionOwner, Partition<I, V, E>>
ownerPartitionMap = Maps.newHashMap();
- /** Number of messages in each partition */
- private final Map<PartitionOwner, Integer> messageCountMap =
- Maps.newHashMap();
/** Context */
private final Mapper<?, ?, ?, ?>.Context context;
/** Configuration */
- private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+ private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
/**
* Regulates the size of outgoing Collections of vertices read
* by the local worker during INPUT_SUPERSTEP that are to be
@@ -69,7 +65,7 @@ public class SendPartitionCache<I extends WritableComparable,
*/
public SendPartitionCache(
Mapper<?, ?, ?, ?>.Context context,
- ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) {
+ ImmutableClassesGiraphConfiguration<I, V, E> configuration) {
this.context = context;
this.configuration = configuration;
transferRegulator =
@@ -89,9 +85,9 @@ public class SendPartitionCache<I extends WritableComparable,
* @param vertex Vertex to add
* @return A partition to send or null, if requirements are not met
*/
- public Partition<I, V, E, M> addVertex(PartitionOwner partitionOwner,
- Vertex<I, V, E, M> vertex) {
- Partition<I, V, E, M> partition =
+ public Partition<I, V, E> addVertex(PartitionOwner partitionOwner,
+ Vertex<I, V, E> vertex) {
+ Partition<I, V, E> partition =
ownerPartitionMap.get(partitionOwner);
if (partition == null) {
partition = configuration.createPartition(
@@ -101,7 +97,7 @@ public class SendPartitionCache<I extends WritableComparable,
}
transferRegulator.incrementCounters(partitionOwner, vertex);
- Vertex<I, V, E, M> oldVertex =
+ Vertex<I, V, E> oldVertex =
partition.putVertex(vertex);
if (oldVertex != null) {
LOG.warn("addVertex: Replacing vertex " + oldVertex +
@@ -121,7 +117,7 @@ public class SendPartitionCache<I extends WritableComparable,
*
* @return Owner partition map
*/
- public Map<PartitionOwner, Partition<I, V, E, M>> getOwnerPartitionMap() {
+ public Map<PartitionOwner, Partition<I, V, E>> getOwnerPartitionMap() {
return ownerPartitionMap;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 743a6f8..788be53 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -43,34 +43,38 @@ import java.util.concurrent.ConcurrentHashMap;
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
- * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public class ServerData<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
+ V extends Writable, E extends Writable> {
+ /** Configuration */
+ private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
/** Partition store for this worker. */
- private volatile PartitionStore<I, V, E, M> partitionStore;
+ private volatile PartitionStore<I, V, E> partitionStore;
/** Edge store for this worker. */
- private final EdgeStore<I, V, E, M> edgeStore;
+ private final EdgeStore<I, V, E> edgeStore;
/** Message store factory */
private final
- MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> messageStoreFactory;
+ MessageStoreFactory<I, Writable, MessageStoreByPartition<I, Writable>>
+ messageStoreFactory;
/**
* Message store for incoming messages (messages which will be consumed
* in the next super step)
*/
- private volatile MessageStoreByPartition<I, M> incomingMessageStore;
+ private volatile MessageStoreByPartition<I, Writable>
+ incomingMessageStore;
/**
* Message store for current messages (messages which we received in
* previous super step and which will be consumed in current super step)
*/
- private volatile MessageStoreByPartition<I, M> currentMessageStore;
+ private volatile MessageStoreByPartition<I, Writable>
+ currentMessageStore;
/**
* Map of partition ids to incoming vertex mutations from other workers.
* (Synchronized access to values)
*/
- private final ConcurrentHashMap<I, VertexMutations<I, V, E, M>>
- vertexMutations = new ConcurrentHashMap<I, VertexMutations<I, V, E, M>>();
+ private final ConcurrentHashMap<I, VertexMutations<I, V, E>>
+ vertexMutations = new ConcurrentHashMap<I, VertexMutations<I, V, E>>();
/**
* Holds aggregtors which current worker owns from current superstep
*/
@@ -89,28 +93,30 @@ public class ServerData<I extends WritableComparable,
* @param context Mapper context
*/
public ServerData(
- CentralizedServiceWorker<I, V, E, M> service,
- ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
- MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
+ CentralizedServiceWorker<I, V, E> service,
+ ImmutableClassesGiraphConfiguration<I, V, E> conf,
+ MessageStoreFactory<I, Writable, MessageStoreByPartition<I, Writable>>
messageStoreFactory,
Mapper<?, ?, ?, ?>.Context context) {
-
+ this.conf = conf;
this.messageStoreFactory = messageStoreFactory;
- currentMessageStore = messageStoreFactory.newStore();
- incomingMessageStore = messageStoreFactory.newStore();
+ currentMessageStore =
+ messageStoreFactory.newStore(conf.getOutgoingMessageValueClass());
+ incomingMessageStore =
+ messageStoreFactory.newStore(conf.getIncomingMessageValueClass());
if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) {
partitionStore =
- new DiskBackedPartitionStore<I, V, E, M>(conf, context);
+ new DiskBackedPartitionStore<I, V, E>(conf, context);
} else {
partitionStore =
- new SimplePartitionStore<I, V, E, M>(conf, context);
+ new SimplePartitionStore<I, V, E>(conf, context);
}
- edgeStore = new EdgeStore<I, V, E, M>(service, conf, context);
+ edgeStore = new EdgeStore<I, V, E>(service, conf, context);
ownerAggregatorData = new OwnerAggregatorServerData(context, conf);
allAggregatorData = new AllAggregatorServerData(context, conf);
}
- public EdgeStore<I, V, E, M> getEdgeStore() {
+ public EdgeStore<I, V, E> getEdgeStore() {
return edgeStore;
}
@@ -119,7 +125,7 @@ public class ServerData<I extends WritableComparable,
*
* @return The partition store
*/
- public PartitionStore<I, V, E, M> getPartitionStore() {
+ public PartitionStore<I, V, E> getPartitionStore() {
return partitionStore;
}
@@ -127,20 +133,24 @@ public class ServerData<I extends WritableComparable,
* Get message store for incoming messages (messages which will be consumed
* in the next super step)
*
+ * @param <M> Message data
* @return Incoming message store
*/
- public MessageStoreByPartition<I, M> getIncomingMessageStore() {
- return incomingMessageStore;
+ public <M extends Writable> MessageStoreByPartition<I, M>
+ getIncomingMessageStore() {
+ return (MessageStoreByPartition<I, M>) incomingMessageStore;
}
/**
* Get message store for current messages (messages which we received in
* previous super step and which will be consumed in current super step)
*
+ * @param <M> Message data
* @return Current message store
*/
- public MessageStoreByPartition<I, M> getCurrentMessageStore() {
- return currentMessageStore;
+ public <M extends Writable> MessageStoreByPartition<I, M>
+ getCurrentMessageStore() {
+ return (MessageStoreByPartition<I, M>) currentMessageStore;
}
/** Prepare for next super step */
@@ -154,7 +164,8 @@ public class ServerData<I extends WritableComparable,
}
}
currentMessageStore = incomingMessageStore;
- incomingMessageStore = messageStoreFactory.newStore();
+ incomingMessageStore =
+ messageStoreFactory.newStore(conf.getOutgoingMessageValueClass());
}
/**
@@ -162,7 +173,7 @@ public class ServerData<I extends WritableComparable,
*
* @return Vertex mutations
*/
- public ConcurrentHashMap<I, VertexMutations<I, V, E, M>>
+ public ConcurrentHashMap<I, VertexMutations<I, V, E>>
getVertexMutations() {
return vertexMutations;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java
index ebabf45..3759f6b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java
@@ -33,11 +33,10 @@ import java.io.IOException;
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public interface WorkerClient<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
+ V extends Writable, E extends Writable> {
/**
* Setup the client.
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
index bc0637f..731d0ee 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
@@ -33,10 +33,9 @@ import java.io.IOException;
* @param <I> Vertex index value
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message data
*/
public interface WorkerClientRequestProcessor<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
+ V extends Writable, E extends Writable> {
/**
* Sends a message to destination vertex.
*
@@ -44,7 +43,7 @@ public interface WorkerClientRequestProcessor<I extends WritableComparable,
* @param message Message to send.
* @return true if any network I/O occurred.
*/
- boolean sendMessageRequest(I destVertexId, M message);
+ boolean sendMessageRequest(I destVertexId, Writable message);
/**
* Sends a vertex to the appropriate partition owner
@@ -53,7 +52,7 @@ public interface WorkerClientRequestProcessor<I extends WritableComparable,
* @param vertex Vertex to send
*/
void sendVertexRequest(PartitionOwner partitionOwner,
- Vertex<I, V, E, M> vertex);
+ Vertex<I, V, E> vertex);
/**
* Send a partition request (no batching).
@@ -62,7 +61,7 @@ public interface WorkerClientRequestProcessor<I extends WritableComparable,
* @param partition Partition to send
*/
void sendPartitionRequest(WorkerInfo workerInfo,
- Partition<I, V, E, M> partition);
+ Partition<I, V, E> partition);
/**
* Sends a request to the appropriate vertex range owner to add an edge
@@ -103,7 +102,7 @@ public interface WorkerClientRequestProcessor<I extends WritableComparable,
* @param vertex Vertex to be added
* @throws IOException
*/
- void addVertexRequest(Vertex<I, V, E, M> vertex) throws IOException;
+ void addVertexRequest(Vertex<I, V, E> vertex) throws IOException;
/**
* Sends a request to the appropriate vertex range owner to remove a vertex
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
index e373b2c..bed07b9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
@@ -18,7 +18,6 @@
package org.apache.giraph.comm;
-import org.apache.giraph.graph.GraphState;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -31,11 +30,10 @@ import java.net.InetSocketAddress;
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
- * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public interface WorkerServer<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
+ V extends Writable, E extends Writable>
extends Closeable {
/**
* Get server address
@@ -46,17 +44,15 @@ public interface WorkerServer<I extends WritableComparable,
/**
* Prepare incoming messages for computation, and resolve mutation requests.
- *
- * @param graphState Current graph state
*/
- void prepareSuperstep(GraphState<I, V, E, M> graphState);
+ void prepareSuperstep();
/**
* Get server data
*
* @return Server data
*/
- ServerData<I, V, E, M> getServerData();
+ ServerData<I, V, E> getServerData();
/**
* Shuts down.