You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2011/11/15 01:54:22 UTC
svn commit: r1201987 [1/5] - in /incubator/giraph/trunk: ./
src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/examples/ src/main/java/org/apache/giraph/...
Author: aching
Date: Tue Nov 15 00:54:20 2011
New Revision: 1201987
URL: http://svn.apache.org/viewvc?rev=1201987&view=rev
Log:
GIRAPH-11: Improve the graph distribution of Giraph. (aching)
Added:
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java (with props)
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java (with props)
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java (with props)
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java (with props)
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java (with props)
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java (with props)
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java (with props)
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java (with props)
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java (with props)
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java (with props)
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java (with props)
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java (with props)
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java (with props)
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java (with props)
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java (with props)
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java (with props)
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionerFactory.java (with props)
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java (with props)
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java (with props)
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java (with props)
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/WritableUtils.java
incubator/giraph/trunk/src/test/java/org/apache/giraph/TestGraphPartitioner.java
Removed:
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SuperstepBalancer.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AutoBalancer.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexRangeBalancer.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/StaticBalancer.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRangeBalancer.java
incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java
Modified:
incubator/giraph/trunk/CHANGELOG
incubator/giraph/trunk/pom.xml
incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MaxAggregator.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinAggregator.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java
incubator/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java
Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Tue Nov 15 00:54:20 2011
@@ -1,7 +1,9 @@
Giraph Change Log
Release 0.70.0 - unreleased
-
+
+ GIRAPH-11: Improve the graph distribution of Giraph. (aching)
+
GIRAPH-64: Create VertexRunner to make it easier to run users'
computations. (jghoman)
Modified: incubator/giraph/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/pom.xml?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/pom.xml (original)
+++ incubator/giraph/trunk/pom.xml Tue Nov 15 00:54:20 2011
@@ -499,7 +499,7 @@ under the License.
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
- <version>3.3.1</version>
+ <version>3.3.3</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java Tue Nov 15 00:54:20 2011
@@ -147,7 +147,7 @@ public class PseudoRandomVertexInputForm
@Override
public BasicVertex<LongWritable, DoubleWritable, DoubleWritable, M> getCurrentVertex()
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
BasicVertex<LongWritable, DoubleWritable, DoubleWritable, M> vertex =
BspUtils.createVertex(configuration);
long vertexId = startingVertexId + verticesRead;
@@ -163,11 +163,10 @@ public class PseudoRandomVertexInputForm
destVertexId =
new LongWritable(Math.abs(rand.nextLong()) %
aggregateVertices);
- } while (vertex.hasEdge(destVertexId));
+ } while (edges.containsKey(destVertexId));
edges.put(destVertexId, new DoubleWritable(rand.nextDouble()));
}
vertex.initialize(new LongWritable(vertexId), vertexValue, edges, null);
-
++verticesRead;
if (LOG.isDebugEnabled()) {
LOG.debug("next: Return vertexId=" +
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Tue Nov 15 00:54:20 2011
@@ -19,14 +19,19 @@
package org.apache.giraph.bsp;
import java.io.IOException;
-import java.util.NavigableMap;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.giraph.graph.AggregatorUsage;
+import org.apache.giraph.graph.BasicVertex;
import org.apache.giraph.graph.GraphMapper;
-import org.apache.giraph.graph.VertexRange;
-import org.apache.giraph.graph.BasicVertexRangeBalancer;
+import org.apache.giraph.graph.partition.Partition;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.graph.WorkerInfo;
import org.apache.giraph.graph.WorkerContext;
/**
@@ -41,41 +46,34 @@ public interface CentralizedServiceWorke
M extends Writable>
extends CentralizedService<I, V, E, M>, AggregatorUsage {
/**
- * Get the hostname of this worker
+ * Get the worker information
*
- * @return hostname of this worker
+ * @return Worker information
*/
- String getHostname();
-
- /**
- * Get the port of the RPC server on this worker.
- *
- * @return RPC server of this worker
- */
- int getPort();
+ WorkerInfo getWorkerInfo();
/**
- *
+ *
* @return worker's WorkerContext
*/
WorkerContext getWorkerContext();
/**
- * Get a synchronized map to the partitions and their sorted vertex lists.
- * This could be used to run compute for the vertices or checkpointing.
+ * Get a map of the partition id to the partition for this worker.
+ * The partitions contain the vertices for
+ * this worker and can be used to run compute() for the vertices or do
+ * checkpointing.
*
- * @return map of max vertex index to list of vertices on that vertex range
+ * @return List of partitions that this worker owns.
*/
- NavigableMap<I, VertexRange<I, V, E, M>> getVertexRangeMap();
+ Map<Integer, Partition<I, V, E, M>> getPartitionMap();
/**
- * Get the current map to the partitions and their sorted vertex lists.
- * This is needed by the communication service to shift incoming messages
- * to the vertex lists before the new map gets synchronized.
+ * Get a collection of all the partition owners.
*
- * @return map of max vertex index to list of vertices on that vertex range
+ * @return Collection of all the partition owners.
*/
- NavigableMap<I, VertexRange<I, V, E, M>> getCurrentVertexRangeMap();
+ Collection<? extends PartitionOwner> getPartitionOwners();
/**
* Both the vertices and the messages need to be checkpointed in order
@@ -98,58 +96,57 @@ public interface CentralizedServiceWorke
* Take all steps prior to actually beginning the computation of a
* superstep.
*
- * @return true if part of this superstep, false otherwise
+ * @return Collection of all the partition owners from the master for this
+ * superstep.
*/
- boolean startSuperstep();
+ Collection<? extends PartitionOwner> startSuperstep();
/**
* Worker is done with its portion of the superstep. Report the
* worker level statistics after the computation.
*
- * @param workerFinishedVertices Number of finished vertices on this worker
- * @param workerVertices Number of vertices on this worker
- * @param workerEdges Number of edges on this worker
+ * @param partitionStatsList All the partition stats for this worker
* @param workersSentMessages Number of messages sent on this worker
* @return true if this is the last superstep, false otherwise
*/
- boolean finishSuperstep(long workerFinishedVertices,
- long workerVertices,
- long workerEdges,
+ boolean finishSuperstep(List<PartitionStats> partitionStatsList,
long workersSentMessages);
-
/**
- * Every client will need to get a vertex range for a vertex id so that
- * they know where to sent the request.
+ * Get the partition that a vertex index would belong to
*
- * @param superstep Superstep to look for
- * @param vertexIndex Vertex index to look for
- * @return VertexRange that should contain this vertex if it exists
+ * @param vertexIndex Index of the vertex that is used to find the correct
+ * partition.
+ * @return Correct partition if exists on this worker, null otherwise.
*/
- VertexRange<I, V, E, M> getVertexRange(long superstep, I vertexIndex);
+ public Partition<I, V, E, M> getPartition(I vertexIndex);
/**
- * Get the total vertices in the entire application during a given
- * superstep. Note that this is the number of vertices prior to the
- * superstep starting and does not change during the superstep.
+ * Every client will need to get a partition owner from a vertex id so that
+ * they know which worker to sent the request to.
*
- * @return count of all the vertices (local and non-local together)
+ * @param superstep Superstep to look for
+ * @param vertexIndex Vertex index to look for
+ * @return PartitionOnwer that should contain this vertex if it exists
*/
- long getTotalVertices();
+ PartitionOwner getVertexPartitionOwner(I vertexIndex);
/**
- * Get the total edges in the entire application during a given
- * superstep. Note that this is the number of edges prior to the
- * superstep starting and does not change during the superstep.
+ * Look up a vertex on a worker given its vertex index.
*
- * @return count of all the edges (local and non-local together)
+ * @param vertexIndex Vertex index to look for
+ * @return Vertex if it exists on this worker.
*/
- long getTotalEdges();
+ BasicVertex<I, V, E, M> getVertex(I vertexIndex);
/**
- * If desired by the user, vertex ranges are redistributed among workers
- * according to the chosen {@link BasicVertexRangeBalancer}.
+ * If desired by the user, vertex partitions are redistributed among
+ * workers according to the chosen {@link GraphPartitioner}.
+ *
+ * @param masterSetPartitionOwners Partition owner info passed from the
+ * master.
*/
- void exchangeVertexRanges();
+ void exchangeVertexPartitions(
+ Collection<? extends PartitionOwner> masterSetPartitionOwners);
/**
* Get the GraphMapper that this service is using. Vertices need to know
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Tue Nov 15 00:54:20 2011
@@ -27,7 +27,6 @@ import org.apache.giraph.graph.MutableVe
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexCombiner;
import org.apache.giraph.graph.VertexMutations;
-import org.apache.giraph.graph.VertexRange;
import org.apache.giraph.graph.VertexResolver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
@@ -48,7 +47,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -56,6 +54,10 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.graph.partition.Partition;
+import org.apache.giraph.graph.partition.PartitionOwner;
+
/*if[HADOOP_FACEBOOK]
import org.apache.hadoop.ipc.ProtocolSignature;
end[HADOOP_FACEBOOK]*/
@@ -96,6 +98,12 @@ public abstract class BasicRPCCommunicat
private final Map<InetSocketAddress, PeerConnection> peerConnections =
new HashMap<InetSocketAddress, PeerConnection>();
/**
+ * Cached map of partition ids to remote socket address. Needs to be
+ * synchronized.
+ */
+ private final Map<Integer, InetSocketAddress> partitionIndexAddressMap =
+ new HashMap<Integer, InetSocketAddress>();
+ /**
* Thread pool for message flush threads
*/
private final ExecutorService executor;
@@ -122,24 +130,20 @@ public abstract class BasicRPCCommunicat
private final Map<I, List<M>> transientInMessages =
new HashMap<I, List<M>>();
/**
- * Map of vertex ranges to any incoming vertices from other workers.
+ * Map of partition ids to incoming vertices from other workers.
* (Synchronized)
*/
- private final Map<I, List<BasicVertex<I, V, E, M>>>
- inVertexRangeMap =
- new TreeMap<I, List<BasicVertex<I, V, E, M>>>();
+ private final Map<Integer, List<BasicVertex<I, V, E, M>>>
+ inPartitionVertexMap =
+ new HashMap<Integer, List<BasicVertex<I, V, E, M>>>();
+
/**
* Map from vertex index to all vertex mutations
*/
private final Map<I, VertexMutations<I, V, E, M>>
inVertexMutationsMap =
new TreeMap<I, VertexMutations<I, V, E, M>>();
- /**
- * Cached map of vertex ranges to remote socket address. Needs to be
- * synchronized.
- */
- private final Map<I, InetSocketAddress> vertexIndexMapAddressMap =
- new HashMap<I, InetSocketAddress>();
+
/** Maximum size of cached message list, before sending it out */
private final int maxSize;
/** Cached job id */
@@ -380,8 +384,18 @@ public abstract class BasicRPCCommunicat
numHandlers + " handlers and " + numFlushThreads +
" flush threads");
}
+ }
- connectAllRPCProxys(this.jobId, this.jobToken);
+ @Override
+ public void setup() {
+ try {
+ connectAllRPCProxys(this.jobId, this.jobToken);
+ } catch (IOException e) {
+ throw new IllegalStateException("setup: Got IOException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("setup: Got InterruptedException",
+ e);
+ }
}
protected abstract CommunicationsInterface<I, V, E, M> getRPCProxy(
@@ -400,17 +414,17 @@ public abstract class BasicRPCCommunicat
private void connectAllRPCProxys(String jobId, J jobToken)
throws IOException, InterruptedException {
final int maxTries = 5;
- for (VertexRange<I, V, E, M> vertexRange :
- service.getVertexRangeMap().values()) {
+ for (PartitionOwner partitionOwner : service.getPartitionOwners()) {
int tries = 0;
while (tries < maxTries) {
try {
- startPeerConnectionThread(vertexRange, jobId, jobToken);
+ startPeerConnectionThread(
+ partitionOwner.getWorkerInfo(), jobId, jobToken);
break;
} catch (IOException e) {
LOG.warn("connectAllRPCProxys: Failed on attempt " +
tries + " of " + maxTries +
- " to connect to " + vertexRange.toString());
+ " to connect to " + partitionOwner.toString(), e);
++tries;
}
}
@@ -418,24 +432,27 @@ public abstract class BasicRPCCommunicat
}
/**
- * Starts a thread for a vertex range if any only if the inet socket
+ * Creates the connections to remote RPCs if any only if the inet socket
* address doesn't already exist.
*
- * @param vertexRange
+ * @param workerInfo My worker info
+ * @param jobId Id of the job
+ * @param jobToken Required for secure Hadoop
* @throws IOException
+ * @throws InterruptedException
*/
- private void startPeerConnectionThread(VertexRange<I, V, E, M> vertexRange,
+ private void startPeerConnectionThread(WorkerInfo workerInfo,
String jobId,
J jobToken)
throws IOException, InterruptedException {
if (LOG.isDebugEnabled()) {
LOG.debug("startPeerConnectionThread: hostname " +
- vertexRange.getHostname() + ", port " +
- vertexRange.getPort());
+ workerInfo.getHostname() + ", port " +
+ workerInfo.getPort());
}
final InetSocketAddress addr =
- new InetSocketAddress(vertexRange.getHostname(),
- vertexRange.getPort());
+ new InetSocketAddress(workerInfo.getHostname(),
+ workerInfo.getPort());
// Cheap way to hold both the hostname and port (rather than
// make a class)
InetSocketAddress addrUnresolved =
@@ -448,9 +465,7 @@ public abstract class BasicRPCCommunicat
outMsgMap = outMessages.get(addrUnresolved);
if (LOG.isDebugEnabled()) {
LOG.debug("startPeerConnectionThread: Connecting to " +
- vertexRange.getHostname() + ", port = " +
- vertexRange.getPort() + ", max index = " +
- vertexRange.getMaxIndex() + ", addr = " + addr +
+ workerInfo.toString() + ", addr = " + addr +
" if outMsgMap (" + outMsgMap + ") == null ");
}
if (outMsgMap != null) { // this host has already been added
@@ -551,25 +566,26 @@ end[HADOOP_FACEBOOK]*/
}
@Override
- public final void putVertexList(I vertexIndexMax,
+ public final void putVertexList(int partitionId,
VertexList<I, V, E, M> vertexList)
throws IOException {
if (LOG.isDebugEnabled()) {
- LOG.debug("putVertexList: On vertex range " + vertexIndexMax +
+ LOG.debug("putVertexList: On partition id " + partitionId +
" adding vertex list of size " + vertexList.size());
}
- synchronized (inVertexRangeMap) {
+ synchronized (inPartitionVertexMap) {
if (vertexList.size() == 0) {
return;
}
- if (!inVertexRangeMap.containsKey(vertexIndexMax)) {
- inVertexRangeMap.put(vertexIndexMax,
- new ArrayList<BasicVertex<I, V, E, M>>());
- }
- List<BasicVertex<I, V, E, M>> tmpVertexList =
- inVertexRangeMap.get(vertexIndexMax);
- for (BasicVertex<I, V, E, M> hadoopVertex : vertexList) {
- tmpVertexList.add(hadoopVertex);
+ if (!inPartitionVertexMap.containsKey(partitionId)) {
+ inPartitionVertexMap.put(partitionId,
+ new ArrayList<BasicVertex<I, V, E, M>>(vertexList));
+ } else {
+ List<BasicVertex<I, V, E, M>> tmpVertexList =
+ inPartitionVertexMap.get(partitionId);
+ for (BasicVertex<I, V, E, M> hadoopVertex : vertexList) {
+ tmpVertexList.add(hadoopVertex);
+ }
}
}
}
@@ -644,31 +660,28 @@ end[HADOOP_FACEBOOK]*/
}
@Override
- public final void sendVertexListReq(I vertexIndexMax,
- List<BasicVertex<I, V, E, M>> vertexList) {
+ public final void sendPartitionReq(WorkerInfo workerInfo,
+ Partition<I, V, E, M> partition) {
// Internally, break up the sending so that the list doesn't get too
// big.
VertexList<I, V, E, M> hadoopVertexList =
new VertexList<I, V, E, M>();
- InetSocketAddress addr = getInetSocketAddress(vertexIndexMax);
+ InetSocketAddress addr =
+ getInetSocketAddress(workerInfo, partition.getPartitionId());
CommunicationsInterface<I, V, E, M> rpcProxy =
peerConnections.get(addr).getRPCProxy();
if (LOG.isInfoEnabled()) {
- LOG.info("sendVertexList: Sending to " + rpcProxy.getName() + " " +
- addr + ", with vertex index " + vertexIndexMax +
- ", list " + vertexList);
- }
- if (peerConnections.get(addr).isProxy == false) {
- throw new RuntimeException("sendVertexList: Impossible to send " +
- "to self for vertex index max " + vertexIndexMax);
- }
- for (long i = 0; i < vertexList.size(); ++i) {
- hadoopVertexList.add(
- (Vertex<I, V, E, M>) vertexList.get((int) i));
+ LOG.info("sendPartitionReq: Sending to " + rpcProxy.getName() +
+ " " + addr + " from " + workerInfo +
+ ", with partition " + partition);
+ }
+ for (BasicVertex<I, V, E, M> vertex : partition.getVertices()) {
+ hadoopVertexList.add(vertex);
if (hadoopVertexList.size() >= MAX_VERTICES_PER_RPC) {
try {
- rpcProxy.putVertexList(vertexIndexMax, hadoopVertexList);
+ rpcProxy.putVertexList(partition.getPartitionId(),
+ hadoopVertexList);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -677,7 +690,8 @@ end[HADOOP_FACEBOOK]*/
}
if (hadoopVertexList.size() > 0) {
try {
- rpcProxy.putVertexList(vertexIndexMax, hadoopVertexList);
+ rpcProxy.putVertexList(partition.getPartitionId(),
+ hadoopVertexList);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -685,35 +699,48 @@ end[HADOOP_FACEBOOK]*/
}
/**
- * Fill the socket address cache for the vertex range
+ * Fill the socket address cache for the worker info and its partition.
*
- * @param destVertex vertex
+ * @param workerInfo Worker information to get the socket address
+ * @param partitionId
* @return address of the vertex range server containing this vertex
*/
- private InetSocketAddress getInetSocketAddress(I destVertex) {
- VertexRange<I, V, E, M> destVertexRange =
- service.getVertexRange(service.getSuperstep(), destVertex);
- if (destVertexRange == null) {
- LOG.error("getInetSocketAddress: No vertexRange found for " +
- destVertex);
- throw new RuntimeException("getInetSocketAddress: Dest vertex " +
- destVertex);
- }
-
- synchronized(vertexIndexMapAddressMap) {
+ private InetSocketAddress getInetSocketAddress(WorkerInfo workerInfo,
+ int partitionId) {
+ synchronized(partitionIndexAddressMap) {
InetSocketAddress address =
- vertexIndexMapAddressMap.get(destVertexRange.getMaxIndex());
+ partitionIndexAddressMap.get(partitionId);
if (address == null) {
address = InetSocketAddress.createUnresolved(
- destVertexRange.getHostname(),
- destVertexRange.getPort());
- vertexIndexMapAddressMap.put(destVertexRange.getMaxIndex(),
- address);
+ workerInfo.getHostname(),
+ workerInfo.getPort());
+ partitionIndexAddressMap.put(partitionId, address);
+ }
+
+ if (address.getPort() != workerInfo.getPort() ||
+ !address.getHostName().equals(workerInfo.getHostname())) {
+ throw new IllegalStateException(
+ "getInetSocketAddress: Impossible that address " +
+ address + " does not match " + workerInfo);
}
+
return address;
}
}
+ /**
+ * Fill the socket address cache for the partition owner.
+ *
+ * @param destVertex vertex to be sent
+ * @return address of the vertex range server containing this vertex
+ */
+ private InetSocketAddress getInetSocketAddress(I destVertex) {
+ PartitionOwner partitionOwner =
+ service.getVertexPartitionOwner(destVertex);
+ return getInetSocketAddress(partitionOwner.getWorkerInfo(),
+ partitionOwner.getPartitionId());
+ }
+
@Override
public final void sendMessageReq(I destVertex, M msg) {
InetSocketAddress addr = getInetSocketAddress(destVertex);
@@ -812,7 +839,8 @@ end[HADOOP_FACEBOOK]*/
Collection<Future<?>> futures = new ArrayList<Future<?>>();
// randomize peers in order to avoid hotspot on racks
- List<PeerConnection> peerList = new ArrayList<PeerConnection>(peerConnections.values());
+ List<PeerConnection> peerList =
+ new ArrayList<PeerConnection>(peerConnections.values());
Collections.shuffle(peerList);
for (PeerConnection pc : peerList) {
@@ -867,13 +895,11 @@ end[HADOOP_FACEBOOK]*/
}
if (inMessages.size() > 0) {
- // Assign the appropriate messages to each vertex
- NavigableMap<I, VertexRange<I, V, E, M>> vertexRangeMap =
- service.getCurrentVertexRangeMap();
- for (VertexRange<I, V, E, M> vertexRange :
- vertexRangeMap.values()) {
- for (BasicVertex<I, V, E, M> vertex :
- vertexRange.getVertexMap().values()) {
+ // Assign the messages to each destination vertex (getting rid of
+ // the old ones)
+ for (Partition<I, V, E, M> partition :
+ service.getPartitionMap().values()) {
+ for (BasicVertex<I, V, E, M> vertex : partition.getVertices()) {
vertex.getMsgList().clear();
List<M> msgList = inMessages.get(vertex.getVertexId());
if (msgList != null) {
@@ -884,7 +910,8 @@ end[HADOOP_FACEBOOK]*/
}
for (M msg : msgList) {
if (msg == null) {
- LOG.warn("null message in inMessages");
+ LOG.warn("prepareSuperstep: Null message " +
+ "in inMessages");
}
}
vertex.getMsgList().addAll(msgList);
@@ -919,10 +946,8 @@ end[HADOOP_FACEBOOK]*/
VertexResolver<I, V, E, M> vertexResolver =
BspUtils.createVertexResolver(
conf, service.getGraphMapper().getGraphState());
- VertexRange<I, V, E, M> vertexRange =
- service.getVertexRange(service.getSuperstep() - 1, vertexIndex);
BasicVertex<I, V, E, M> originalVertex =
- vertexRange.getVertexMap().get(vertexIndex);
+ service.getVertex(vertexIndex);
List<M> msgList = inMessages.get(vertexIndex);
if (originalVertex != null) {
msgList = originalVertex.getMsgList();
@@ -942,12 +967,19 @@ end[HADOOP_FACEBOOK]*/
vertexMutations);
}
+ Partition<I, V, E, M> partition =
+ service.getPartition(vertexIndex);
+ if (partition == null) {
+ throw new IllegalStateException(
+ "prepareSuperstep: No partition for index " + vertexIndex +
+ " in " + service.getPartitionMap() + " should have been " +
+ service.getVertexPartitionOwner(vertexIndex));
+ }
if (vertex != null) {
((MutableVertex<I, V, E, M>) vertex).setVertexId(vertexIndex);
- vertexRange.getVertexMap().put(vertex.getVertexId(),
- (Vertex<I, V, E, M>) vertex);
+ partition.putVertex((Vertex<I, V, E, M>) vertex);
} else if (originalVertex != null) {
- vertexRange.getVertexMap().remove(originalVertex.getVertexId());
+ partition.removeVertex(originalVertex.getVertexId());
}
}
synchronized (inVertexMutationsMap) {
@@ -956,25 +988,27 @@ end[HADOOP_FACEBOOK]*/
}
@Override
- public void cleanCachedVertexAddressMap() {
- // Fix all the cached inet addresses (remove all changed entries)
- synchronized (vertexIndexMapAddressMap) {
- for (Entry<I, VertexRange<I, V, E, M>> entry :
- service.getVertexRangeMap().entrySet()) {
- if (vertexIndexMapAddressMap.containsKey(entry.getKey())) {
- InetSocketAddress address =
- vertexIndexMapAddressMap.get(entry.getKey());
- if (!address.getHostName().equals(
- entry.getValue().getHostname()) ||
- address.getPort() !=
- entry.getValue().getPort()) {
- LOG.info("prepareSuperstep: Vertex range " +
- entry.getKey() + " changed from " +
- address + " to " +
- entry.getValue().getHostname() + ":" +
- entry.getValue().getPort());
- vertexIndexMapAddressMap.remove(entry.getKey());
+ public void fixPartitionIdToSocketAddrMap() {
+ // 1. Fix all the cached inet addresses (remove all changed entries)
+ // 2. Connect to any new RPC servers
+ synchronized (partitionIndexAddressMap) {
+ for (PartitionOwner partitionOwner : service.getPartitionOwners()) {
+ InetSocketAddress address =
+ partitionIndexAddressMap.get(
+ partitionOwner.getPartitionId());
+ if (address != null &&
+ (!address.getHostName().equals(
+ partitionOwner.getWorkerInfo().getHostname()) ||
+ address.getPort() !=
+ partitionOwner.getWorkerInfo().getPort())) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("fixPartitionIdToSocketAddrMap: " +
+ "Partition owner " +
+ partitionOwner + " changed from " +
+ address);
}
+ partitionIndexAddressMap.remove(
+ partitionOwner.getPartitionId());
}
}
}
@@ -993,8 +1027,7 @@ end[HADOOP_FACEBOOK]*/
}
@Override
- public Map<I, List<BasicVertex<I, V, E, M>>> getInVertexRangeMap() {
- return inVertexRangeMap;
+ public Map<Integer, List<BasicVertex<I, V, E, M>>> getInPartitionVertexMap() {
+ return inPartitionVertexMap;
}
-
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java Tue Nov 15 00:54:20 2011
@@ -22,7 +22,6 @@ import java.io.IOException;
import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.MutableVertex;
-import org.apache.giraph.graph.VertexRange;
/*if_not[HADOOP]
else[HADOOP]*/
import org.apache.giraph.hadoop.BspTokenSelector;
@@ -80,12 +79,11 @@ public interface CommunicationsInterface
/**
* Adds vertex list (index, value, edges, etc.) to the appropriate worker.
*
- * @param vertexIndexMax Max vertex index of {@link VertexRange}
+ * @param partitionId Partition id of the vertices to be added.
* @param vertexList List of vertices to add
*/
- void putVertexList(I vertexIndexMax,
- VertexList<I, V, E, M> vertexList)
- throws IOException;
+ void putVertexList(int partitionId,
+ VertexList<I, V, E, M> vertexList) throws IOException;
/**
* Add an edge to a remote vertex
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java Tue Nov 15 00:54:20 2011
@@ -38,6 +38,7 @@ import org.apache.hadoop.security.token.
import org.apache.log4j.Logger;
import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.graph.GraphState;
import org.apache.giraph.hadoop.BspPolicyProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
@@ -63,7 +64,8 @@ else[HADOOP]*/
public static final Logger LOG = Logger.getLogger(RPCCommunications.class);
public RPCCommunications(Mapper<?, ?, ?, ?>.Context context,
- CentralizedServiceWorker<I, V, E, M> service)
+ CentralizedServiceWorker<I, V, E, M> service,
+ GraphState<I, V, E, M> graphState)
throws IOException, UnknownHostException, InterruptedException {
super(context, service);
}
@@ -99,7 +101,7 @@ else[HADOOP]*/
ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG;
if (conf.getBoolean(
hadoopSecurityAuthorization,
- false)) {
+ false)) {
ServiceAuthorizationManager.refresh(conf, new BspPolicyProvider());
}
JobTokenSecretManager jobTokenSecretManager =
@@ -158,7 +160,7 @@ else[HADOOP]*/
CommunicationsInterface.class, versionID, addr, config);
}
});
- return proxy;
+ return proxy;
/*end[HADOOP]*/
}
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java Tue Nov 15 00:54:20 2011
@@ -35,6 +35,10 @@ public interface ServerInterface<I exten
M extends Writable>
extends Closeable,
WorkerCommunications<I, V, E, M> {
+ /**
+ * Setup the server.
+ */
+ void setup();
/**
* Move the in transition messages to the in messages for every vertex and
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java Tue Nov 15 00:54:20 2011
@@ -21,6 +21,9 @@ package org.apache.giraph.comm;
import org.apache.giraph.graph.BasicVertex;
import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.MutableVertex;
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.graph.partition.Partition;
+
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -35,18 +38,17 @@ import java.util.Map;
* @param <V extends Writable> vertex value
* @param <E extends Writable> edge value
* @param <M extends Writable> message data
- *
- **/
+ */
@SuppressWarnings("rawtypes")
public interface WorkerCommunications<I extends WritableComparable,
V extends Writable,
E extends Writable,
M extends Writable> {
/**
- * Clean the cached map of vertex addresses that have changed
- * because of rebalancing.
+ * Fix changes to the workers and the mapping between partitions and
+ * workers.
*/
- void cleanCachedVertexAddressMap();
+ void fixPartitionIdToSocketAddrMap();
/**
* Sends a message to destination vertex.
@@ -57,13 +59,13 @@ public interface WorkerCommunications<I
void sendMessageReq(I id, M msg);
/**
- * Sends a list of vertices to the appropriate vertex range owner
+ * Sends a partition to the appropriate partition owner
*
- * @param vertexIndexMax Vertex range that the vertices belong to
- * @param vertexList List of vertices assigned to the vertexRangeIndex
+ * @param workerInfo Owner the vertices belong to
+ * @param partition Partition to send
*/
- void sendVertexListReq(I vertexIndexMax,
- List<BasicVertex<I, V, E, M>> vertexList);
+ void sendPartitionReq(WorkerInfo workerInfo,
+ Partition<I, V, E, M> partition);
/**
* Sends a request to the appropriate vertex range owner to add an edge
@@ -107,5 +109,5 @@ public interface WorkerCommunications<I
*
* @return map of vertex ranges to vertices
*/
- Map<I, List<BasicVertex<I, V, E, M>>> getInVertexRangeMap();
+ Map<Integer, List<BasicVertex<I, V, E, M>>> getInPartitionVertexMap();
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java Tue Nov 15 00:54:20 2011
@@ -42,10 +42,8 @@ public abstract class GeneratedVertexInp
@Override
public List<InputSplit> getSplits(JobContext context, int numWorkers)
throws IOException, InterruptedException {
- /*
- * This is meaningless, the VertexReader will generate all the test
- * data.
- */
+ // This is meaningless, the VertexReader will generate all the test
+ // data.
List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
for (int i = 0; i < numWorkers; ++i) {
inputSplitList.add(new BspInputSplit(i, numWorkers));
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java Tue Nov 15 00:54:20 2011
@@ -46,12 +46,17 @@ public abstract class GeneratedVertexRea
protected long totalRecords = 0;
/** The input split from initialize(). */
protected BspInputSplit inputSplit = null;
+ /** Reverse the id order? */
+ protected boolean reverseIdOrder;
protected Configuration configuration = null;
public static final String READER_VERTICES =
- "TestVertexReader.reader_vertices";
+ "GeneratedVertexReader.reader_vertices";
public static final long DEFAULT_READER_VERTICES = 10;
+ public static final String REVERSE_ID_ORDER =
+ "GeneratedVertexReader.reverseIdOrder";
+ public static final boolean DEAFULT_REVERSE_ID_ORDER = false;
public GeneratedVertexReader() {
}
@@ -62,8 +67,11 @@ public abstract class GeneratedVertexRea
throws IOException {
configuration = context.getConfiguration();
totalRecords = configuration.getLong(
- GeneratedVertexReader.READER_VERTICES,
- GeneratedVertexReader.DEFAULT_READER_VERTICES);
+ GeneratedVertexReader.READER_VERTICES,
+ GeneratedVertexReader.DEFAULT_READER_VERTICES);
+ reverseIdOrder = configuration.getBoolean(
+ GeneratedVertexReader.REVERSE_ID_ORDER,
+ GeneratedVertexReader.DEAFULT_REVERSE_ID_ORDER);
this.inputSplit = (BspInputSplit) inputSplit;
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MaxAggregator.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MaxAggregator.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MaxAggregator.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MaxAggregator.java Tue Nov 15 00:54:20 2011
@@ -49,5 +49,5 @@ public class MaxAggregator implements Ag
public DoubleWritable createAggregatedValue() {
return new DoubleWritable();
}
-
+
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinAggregator.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinAggregator.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinAggregator.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinAggregator.java Tue Nov 15 00:54:20 2011
@@ -35,7 +35,7 @@ public class MinAggregator implements Ag
double val = value.get();
if (val < min) {
min = val;
- }
+ }
}
public void setAggregatedValue(DoubleWritable value) {
@@ -49,5 +49,5 @@ public class MinAggregator implements Ag
public DoubleWritable createAggregatedValue() {
return new DoubleWritable();
}
-
+
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java Tue Nov 15 00:54:20 2011
@@ -54,12 +54,13 @@ public class SimpleMutateGraphVertex ext
}
@Override
- public void compute(Iterator<DoubleWritable> msgIterator) throws IOException {
+ public void compute(Iterator<DoubleWritable> msgIterator)
+ throws IOException {
SimpleMutateGraphVertexWorkerContext workerContext =
(SimpleMutateGraphVertexWorkerContext) getWorkerContext();
-
- if (getSuperstep() == 1) {
+ if (getSuperstep() == 0) {
+ } else if (getSuperstep() == 1) {
// Send messages to vertices that are sure not to exist
// (creating them)
LongWritable destVertexId =
@@ -67,18 +68,18 @@ public class SimpleMutateGraphVertex ext
sendMsg(destVertexId, new DoubleWritable(0.0));
} else if (getSuperstep() == 2) {
} else if (getSuperstep() == 3) {
- long vertex_count = workerContext.getVertexCount();
- if (vertex_count * 2 != getNumVertices()) {
+ long vertexCount = workerContext.getVertexCount();
+ if (vertexCount * 2 != getNumVertices()) {
throw new IllegalStateException(
"Impossible to have " + getNumVertices() +
- " vertices when should have " + vertex_count * 2 +
+ " vertices when should have " + vertexCount * 2 +
" on superstep " + getSuperstep());
}
- long edge_count = workerContext.getEdgeCount();
- if (edge_count != getNumEdges()) {
+ long edgeCount = workerContext.getEdgeCount();
+ if (edgeCount != getNumEdges()) {
throw new IllegalStateException(
"Impossible to have " + getNumEdges() +
- " edges when should have " + edge_count +
+ " edges when should have " + edgeCount +
" on superstep " + getSuperstep());
}
// Create vertices that are sure not to exist (doubling vertices)
@@ -94,18 +95,18 @@ public class SimpleMutateGraphVertex ext
getVertexId(), new FloatWritable(0.0f)));
} else if (getSuperstep() == 4) {
} else if (getSuperstep() == 5) {
- long vertex_count = workerContext.getVertexCount();
- if (vertex_count * 2 != getNumVertices()) {
+ long vertexCount = workerContext.getVertexCount();
+ if (vertexCount * 2 != getNumVertices()) {
throw new IllegalStateException(
"Impossible to have " + getNumVertices() +
- " when should have " + vertex_count * 2 +
+ " when should have " + vertexCount * 2 +
" on superstep " + getSuperstep());
}
- long edge_count = workerContext.getEdgeCount();
- if (edge_count + vertex_count != getNumEdges()) {
+ long edgeCount = workerContext.getEdgeCount();
+ if (edgeCount + vertexCount != getNumEdges()) {
throw new IllegalStateException(
"Impossible to have " + getNumEdges() +
- " edges when should have " + edge_count + vertex_count +
+ " edges when should have " + edgeCount + vertexCount +
" on superstep " + getSuperstep());
}
// Remove the edges created in superstep 3
@@ -141,7 +142,8 @@ public class SimpleMutateGraphVertex ext
}
}
- public static class SimpleMutateGraphVertexWorkerContext extends WorkerContext {
+ public static class SimpleMutateGraphVertexWorkerContext
+ extends WorkerContext {
/** Cached vertex count */
private long vertexCount;
/** Cached edge count */
@@ -158,8 +160,11 @@ public class SimpleMutateGraphVertex ext
@Override
public void postApplication() { }
+ @Override
+ public void preSuperstep() { }
+
@Override
- public void preSuperstep() {
+ public void postSuperstep() {
vertexCount = getNumVertices();
edgeCount = getNumEdges();
if (getSuperstep() == 1) {
@@ -172,9 +177,6 @@ public class SimpleMutateGraphVertex ext
edgesRemoved = 0;
}
- @Override
- public void postSuperstep() { }
-
public long getVertexCount() {
return vertexCount;
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java Tue Nov 15 00:54:20 2011
@@ -56,7 +56,8 @@ public class SimpleSuperstepVertex exten
* Simple VertexReader that supports {@link SimpleSuperstepVertex}
*/
public static class SimpleSuperstepVertexReader extends
- GeneratedVertexReader<LongWritable, IntWritable, FloatWritable, IntWritable> {
+ GeneratedVertexReader<LongWritable, IntWritable,
+ FloatWritable, IntWritable> {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(SimpleSuperstepVertexReader.class);
@@ -70,18 +71,28 @@ public class SimpleSuperstepVertex exten
}
@Override
- public BasicVertex<LongWritable, IntWritable, FloatWritable, IntWritable> getCurrentVertex()
- throws IOException, InterruptedException {
- BasicVertex<LongWritable, IntWritable, FloatWritable, IntWritable> vertex =
- BspUtils.<LongWritable, IntWritable, FloatWritable, IntWritable>createVertex(
+ public BasicVertex<LongWritable, IntWritable, FloatWritable,
+ IntWritable> getCurrentVertex()
+ throws IOException, InterruptedException {
+ BasicVertex<LongWritable, IntWritable,
+ FloatWritable, IntWritable> vertex =
+ BspUtils.<LongWritable, IntWritable,
+ FloatWritable, IntWritable>createVertex(
configuration);
- LongWritable vertexId = new LongWritable(
- (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
- IntWritable vertexValue = new IntWritable((int) (vertexId.get() * 10));
+ long tmpId = reverseIdOrder ?
+ ((inputSplit.getSplitIndex() + 1) * totalRecords) -
+ recordsRead - 1 :
+ (inputSplit.getSplitIndex() * totalRecords) + recordsRead;
+ LongWritable vertexId = new LongWritable(tmpId);
+ IntWritable vertexValue =
+ new IntWritable((int) (vertexId.get() * 10));
Map<LongWritable, FloatWritable> edgeMap = Maps.newHashMap();
- long destVertexId = (vertexId.get() + 1) % (inputSplit.getNumSplits() * totalRecords);
+ long destVertexId =
+ (vertexId.get() + 1) %
+ (inputSplit.getNumSplits() * totalRecords);
float edgeValue = vertexId.get() * 100f;
- edgeMap.put(new LongWritable(destVertexId), new FloatWritable(edgeValue));
+ edgeMap.put(new LongWritable(destVertexId),
+ new FloatWritable(edgeValue));
vertex.initialize(vertexId, vertexValue, edgeMap, null);
++recordsRead;
if (LOG.isInfoEnabled()) {
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.*;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * An example that simply uses its id, value, and edges to compute new data
+ * every iteration to verify that messages are sent and received at the
+ * appropriate location and superstep.
+ */
+public class VerifyMessage {
+ public static class VerifiableMessage implements Writable {
+ /** Superstep sent on */
+ public long superstep;
+ /** Source vertex id */
+ public long sourceVertexId;
+ /** Value */
+ public float value;
+
+ public VerifiableMessage() {}
+
+ public VerifiableMessage(
+ long superstep, long sourceVertexId, float value) {
+ this.superstep = superstep;
+ this.sourceVertexId = sourceVertexId;
+ this.value = value;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ superstep = input.readLong();
+ sourceVertexId = input.readLong();
+ value = input.readFloat();
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.writeLong(superstep);
+ output.writeLong(sourceVertexId);
+ output.writeFloat(value);
+ }
+
+ @Override
+ public String toString() {
+ return "(superstep=" + superstep + ",sourceVertexId=" +
+ sourceVertexId + ",value=" + value + ")";
+ }
+ }
+
+ public static class VerifyMessageVertex extends
+ Vertex<LongWritable, IntWritable, FloatWritable, VerifiableMessage> {
+ /** User can access this after the application finishes if local */
+ public static long finalSum;
+ /** Number of supersteps to run (6 by default) */
+ private static int supersteps = 6;
+ /** Class logger */
+ private static Logger LOG = Logger.getLogger(VerifyMessageVertex.class);
+
+ /** Dynamically set number of supersteps */
+ public static final String SUPERSTEP_COUNT =
+ "verifyMessageVertex.superstepCount";
+
+ public static class VerifyMessageVertexWorkerContext extends
+ WorkerContext {
+ @Override
+ public void preApplication() throws InstantiationException,
+ IllegalAccessException {
+ registerAggregator(LongSumAggregator.class.getName(),
+ LongSumAggregator.class);
+ LongSumAggregator sumAggregator = (LongSumAggregator)
+ getAggregator(LongSumAggregator.class.getName());
+ sumAggregator.setAggregatedValue(new LongWritable(0));
+ supersteps = getContext().getConfiguration().getInt(
+ SUPERSTEP_COUNT, supersteps);
+ }
+
+ @Override
+ public void postApplication() {
+ LongSumAggregator sumAggregator = (LongSumAggregator)
+ getAggregator(LongSumAggregator.class.getName());
+ finalSum = sumAggregator.getAggregatedValue().get();
+ }
+
+ @Override
+ public void preSuperstep() {
+ useAggregator(LongSumAggregator.class.getName());
+ }
+
+ @Override
+ public void postSuperstep() {}
+ }
+
+ @Override
+ public void compute(Iterator<VerifiableMessage> msgIterator) {
+ LongSumAggregator sumAggregator = (LongSumAggregator)
+ getAggregator(LongSumAggregator.class.getName());
+ if (getSuperstep() > supersteps) {
+ voteToHalt();
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("compute: " + sumAggregator);
+ }
+ sumAggregator.aggregate(getVertexId().get());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("compute: sum = " +
+ sumAggregator.getAggregatedValue().get() +
+ " for vertex " + getVertexId());
+ }
+ float msgValue = 0.0f;
+ while (msgIterator.hasNext()) {
+ VerifiableMessage msg = msgIterator.next();
+ msgValue += msg.value;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("compute: got msg = " + msg +
+ " for vertex id " + getVertexId() +
+ ", vertex value " + getVertexValue() +
+ " on superstep " + getSuperstep());
+ }
+ if (msg.superstep != getSuperstep() - 1) {
+ throw new IllegalStateException(
+ "compute: Impossible to not get a messsage from " +
+ "the previous superstep, current superstep = " +
+ getSuperstep());
+ }
+ if ((msg.sourceVertexId != getVertexId().get() - 1) &&
+ (getVertexId().get() != 0)) {
+ throw new IllegalStateException(
+ "compute: Impossible that this message didn't come " +
+ "from the previous vertex and came from " +
+ msg.sourceVertexId);
+ }
+ }
+ int vertexValue = getVertexValue().get();
+ setVertexValue(new IntWritable(vertexValue + (int) msgValue));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("compute: vertex " + getVertexId() +
+ " has value " + getVertexValue() +
+ " on superstep " + getSuperstep());
+ }
+ for (LongWritable targetVertexId : this) {
+ FloatWritable edgeValue = getEdgeValue(targetVertexId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("compute: vertex " + getVertexId() +
+ " sending edgeValue " + edgeValue +
+ " vertexValue " + vertexValue +
+ " total " +
+ (edgeValue.get() + (float) vertexValue) +
+ " to vertex " + targetVertexId +
+ " on superstep " + getSuperstep());
+ }
+ edgeValue.set(edgeValue.get() + (float) vertexValue);
+ addEdge(targetVertexId, edgeValue);
+ sendMsg(targetVertexId,
+ new VerifiableMessage(
+ getSuperstep(), getVertexId().get(), edgeValue.get()));
+ }
+ }
+ }
+}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java Tue Nov 15 00:54:20 2011
@@ -45,8 +45,11 @@ public abstract class BasicVertex<I exte
private GraphState<I,V,E,M> graphState;
/** Configuration */
private Configuration conf;
+ /** If true, do not do anymore computation on this vertex. */
+ boolean halt = false;
- public abstract void initialize(I vertexId, V vertexValue, Map<I, E> edges, List<M> messages);
+ public abstract void initialize(
+ I vertexId, V vertexValue, Map<I, E> edges, List<M> messages);
/**
* Must be defined by user to do computation on a single Vertex.
@@ -151,7 +154,7 @@ public abstract class BasicVertex<I exte
throw new IllegalArgumentException(
"sendMsg: Cannot send null message to " + id);
}
- getGraphState().getGraphMapper().getWorkerCommunications().
+ getGraphState().getWorkerCommunications().
sendMessageReq(id, msg);
}
@@ -162,16 +165,20 @@ public abstract class BasicVertex<I exte
/**
* After this is called, the compute() code will no longer be called for
- * this vertice unless a message is sent to it. Then the compute() code
+ * this vertex unless a message is sent to it. Then the compute() code
* will be called once again until this function is called. The
* application finishes only when all vertices vote to halt.
*/
- public abstract void voteToHalt();
+ public void voteToHalt() {
+ halt = true;
+ }
/**
* Is this vertex done?
*/
- public abstract boolean isHalted();
+ public boolean isHalted() {
+ return halt;
+ }
/**
* Get the list of incoming messages from the previous superstep. Same as
@@ -202,13 +209,13 @@ public abstract class BasicVertex<I exte
*
* @return Mapper context
*/
- public Mapper.Context getContext() {
- return getGraphState().getContext();
- }
-
+ public Mapper.Context getContext() {
+ return getGraphState().getContext();
+ }
+
/**
* Get the worker context
- *
+ *
* @return WorkerContext context
*/
public WorkerContext getWorkerContext() {
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Tue Nov 15 00:54:20 2011
@@ -19,6 +19,7 @@
package org.apache.giraph.graph;
import org.apache.giraph.bsp.CentralizedService;
+import org.apache.giraph.graph.partition.GraphPartitionerFactory;
import org.apache.giraph.zk.BspEvent;
import org.apache.giraph.zk.PredicateLock;
import org.apache.giraph.zk.ZooKeeperExt;
@@ -36,7 +37,6 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
-import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
@@ -48,8 +48,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
import java.util.TreeMap;
/**
@@ -75,15 +73,10 @@ public abstract class BspService <
/** InputSplit reservation or finished notification and synchronization */
private final BspEvent inputSplitsStateChanged =
new PredicateLock();
- /** Are the worker assignments of vertex ranges ready? */
- private final BspEvent vertexRangeAssignmentsReadyChanged =
- new PredicateLock();
- /** Have the vertex range exchange children changed? */
- private final BspEvent vertexRangeExchangeChildrenChanged =
- new PredicateLock();
- /** Are the vertex range exchanges done? */
- private final BspEvent vertexRangeExchangeFinishedChanged =
+ /** Are the partition assignments to workers ready? */
+ private final BspEvent partitionAssignmentsReadyChanged =
new PredicateLock();
+
/** Application attempt changed */
private final BspEvent applicationAttemptChanged =
new PredicateLock();
@@ -117,6 +110,8 @@ public abstract class BspService <
private final String hostname;
/** Combination of hostname '_' partition (unique id) */
private final String hostnamePartitionId;
+ /** Graph partitioner */
+ private final GraphPartitionerFactory<I, V, E, M> graphPartitionerFactory;
/** Mapper that will do the graph computation */
private final GraphMapper<I, V, E, M> graphMapper;
/** Class logger */
@@ -125,11 +120,6 @@ public abstract class BspService <
private final FileSystem fs;
/** Checkpoint frequency */
private int checkpointFrequency = -1;
- /** Vertex range map based on the superstep below */
- private NavigableMap<I, VertexRange<I, V, E, M>> vertexRangeMap =
- new TreeMap<I, VertexRange<I, V, E, M>>();
- /** Vertex range set is based on this superstep */
- private long vertexRangeSuperstep = UNSET_SUPERSTEP;
/** Map of aggregators */
private Map<String, Aggregator<Writable>> aggregatorMap =
new TreeMap<String, Aggregator<Writable>>();
@@ -159,19 +149,17 @@ public abstract class BspService <
public static final String WORKER_HEALTHY_DIR = "/_workerHealthyDir";
public static final String WORKER_UNHEALTHY_DIR = "/_workerUnhealthyDir";
public static final String WORKER_FINISHED_DIR = "/_workerFinishedDir";
- public static final String VERTEX_RANGE_ASSIGNMENTS_DIR =
- "/_vertexRangeAssignments";
- public static final String VERTEX_RANGE_EXCHANGE_DIR =
- "/_vertexRangeExchangeDir";
- public static final String VERTEX_RANGE_EXCHANGED_FINISHED_NODE =
- "/_vertexRangeExchangeFinished";
+ public static final String PARTITION_ASSIGNMENTS_DIR =
+ "/_partitionAssignments";
+ public static final String PARTITION_EXCHANGE_DIR =
+ "/_partitionExchangeDir";
public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished";
public static final String CLEANED_UP_DIR = "/_cleanedUpDir";
public static final String JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY =
"_aggregatorValueArrayKey";
- public static final String JSONOBJ_VERTEX_RANGE_STAT_ARRAY_KEY =
- "_vertexRangeStatArrayKey";
+ public static final String JSONOBJ_PARTITION_STATS_KEY =
+ "_partitionStatsKey";
public static final String JSONOBJ_FINISHED_VERTICES_KEY =
"_verticesFinishedKey";
public static final String JSONOBJ_NUM_VERTICES_KEY = "_numVerticesKey";
@@ -270,31 +258,36 @@ public abstract class BspService <
}
/**
- * Generate the worker "healthy" directory path for a superstep
+ * Generate the worker information "healthy" directory path for a
+ * superstep
*
* @param attempt application attempt number
* @param superstep superstep to use
* @return directory path based on the a superstep
*/
- final public String getWorkerHealthyPath(long attempt, long superstep) {
+ final public String getWorkerInfoHealthyPath(long attempt,
+ long superstep) {
return APPLICATION_ATTEMPTS_PATH + "/" + attempt +
SUPERSTEP_DIR + "/" + superstep + WORKER_HEALTHY_DIR;
}
/**
- * Generate the worker "unhealthy" directory path for a superstep
+ * Generate the worker information "unhealthy" directory path for a
+ * superstep
*
* @param attempt application attempt number
* @param superstep superstep to use
* @return directory path based on the a superstep
*/
- final public String getWorkerUnhealthyPath(long attempt, long superstep) {
+ final public String getWorkerInfoUnhealthyPath(long attempt,
+ long superstep) {
return APPLICATION_ATTEMPTS_PATH + "/" + attempt +
SUPERSTEP_DIR + "/" + superstep + WORKER_UNHEALTHY_DIR;
}
/**
- * Generate the worker "finished" directory path for a superstep
+ * Generate the worker "finished" directory path for a
+ * superstep
*
* @param attempt application attempt number
* @param superstep superstep to use
@@ -306,44 +299,36 @@ public abstract class BspService <
}
/**
- * Generate the "vertex range assignments" directory path for a superstep
+ * Generate the "partiton assignments" directory path for a superstep
*
* @param attempt application attempt number
* @param superstep superstep to use
* @return directory path based on the a superstep
*/
- final public String getVertexRangeAssignmentsPath(long attempt,
- long superstep) {
+ final public String getPartitionAssignmentsPath(long attempt,
+ long superstep) {
return APPLICATION_ATTEMPTS_PATH + "/" + attempt +
- SUPERSTEP_DIR + "/" + superstep + VERTEX_RANGE_ASSIGNMENTS_DIR;
+ SUPERSTEP_DIR + "/" + superstep + PARTITION_ASSIGNMENTS_DIR;
}
/**
- * Generate the "vertex range exchange" directory path for a superstep
+ * Generate the "partition exchange" directory path for a superstep
*
* @param attempt application attempt number
* @param superstep superstep to use
* @return directory path based on the a superstep
*/
- final public String getVertexRangeExchangePath(long attempt,
- long superstep) {
+ final public String getPartitionExchangePath(long attempt,
+ long superstep) {
return APPLICATION_ATTEMPTS_PATH + "/" + attempt +
- SUPERSTEP_DIR + "/" + superstep + VERTEX_RANGE_EXCHANGE_DIR;
+ SUPERSTEP_DIR + "/" + superstep + PARTITION_EXCHANGE_DIR;
}
- /**
- * Generate the "vertex range exchange finished" directory path for
- * a superstep
- *
- * @param attempt application attempt number
- * @param superstep superstep to use
- * @return directory path based on the a superstep
- */
- final public String getVertexRangeExchangeFinishedPath(long attempt,
- long superstep) {
- return APPLICATION_ATTEMPTS_PATH + "/" + attempt +
- SUPERSTEP_DIR + "/" + superstep +
- VERTEX_RANGE_EXCHANGED_FINISHED_NODE;
+ final public String getPartitionExchangeWorkerPath(long attempt,
+ long superstep,
+ WorkerInfo workerInfo) {
+ return getPartitionExchangePath(attempt, superstep) +
+ "/" + workerInfo.getHostnameId();
}
/**
@@ -516,17 +501,10 @@ public abstract class BspService <
return inputSplitsStateChanged;
}
- final public BspEvent getVertexRangeAssignmentsReadyChangedEvent() {
- return vertexRangeAssignmentsReadyChanged;
- }
-
- final public BspEvent getVertexRangeExchangeChildrenChangedEvent() {
- return vertexRangeExchangeChildrenChanged;
+ final public BspEvent getPartitionAssignmentsReadyChangedEvent() {
+ return partitionAssignmentsReadyChanged;
}
- final public BspEvent getVertexRangeExchangeFinishedChangedEvent() {
- return vertexRangeExchangeFinishedChanged;
- }
final public BspEvent getApplicationAttemptChangedEvent() {
return applicationAttemptChanged;
@@ -597,9 +575,7 @@ public abstract class BspService <
registerBspEvent(workerHealthRegistrationChanged);
registerBspEvent(inputSplitsAllReadyChanged);
registerBspEvent(inputSplitsStateChanged);
- registerBspEvent(vertexRangeAssignmentsReadyChanged);
- registerBspEvent(vertexRangeExchangeChildrenChanged);
- registerBspEvent(vertexRangeExchangeFinishedChanged);
+ registerBspEvent(partitionAssignmentsReadyChanged);
registerBspEvent(applicationAttemptChanged);
registerBspEvent(superstepFinished);
registerBspEvent(masterElectionChildrenChanged);
@@ -625,6 +601,9 @@ public abstract class BspService <
throw new RuntimeException(e);
}
this.hostnamePartitionId = hostname + "_" + getTaskPartition();
+ this.graphPartitionerFactory =
+ BspUtils.<I, V, E, M>createGraphPartitioner(conf);
+
this.checkpointFrequency =
conf.getInt(GiraphJob.CHECKPOINT_FREQUENCY,
GiraphJob.CHECKPOINT_FREQUENCY_DEFAULT);
@@ -807,93 +786,6 @@ public abstract class BspService <
}
/**
- * Gets the storable vertex range map, bypasses the cache. Used by workers
- * to dump the vertices into.
- *
- * @return Actual map of max vertex range indices to vertex ranges
- */
- public NavigableMap<I, VertexRange<I, V, E, M>>
- getStorableVertexRangeMap() {
- return vertexRangeMap;
- }
-
- /**
- * Based on a superstep, get the mapping of vertex range maxes to vertex
- * ranges. This can be used to look up a particular vertex.
- *
- * @param superstep Superstep to get the vertex ranges for
- * @return Cached map of max vertex range indices to vertex ranges
- */
- public NavigableMap<I, VertexRange<I, V, E, M>> getVertexRangeMap(
- long superstep) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("getVertexRangeMap: Current superstep = " +
- getSuperstep() + ", desired superstep = " + superstep);
- }
-
- if (vertexRangeSuperstep == superstep) {
- return vertexRangeMap;
- }
- vertexRangeSuperstep = superstep;
- NavigableMap<I, VertexRange<I, V, E, M>> nextVertexRangeMap =
- new TreeMap<I, VertexRange<I, V, E, M>>();
- String vertexRangeAssignmentsPath =
- getVertexRangeAssignmentsPath(getApplicationAttempt(),
- superstep);
- try {
- JSONArray vertexRangeAssignmentsArray =
- new JSONArray(
- new String(getZkExt().getData(vertexRangeAssignmentsPath,
- false,
- null)));
- if (LOG.isDebugEnabled()) {
- LOG.debug("getVertexRangeSet: Found vertex ranges " +
- vertexRangeAssignmentsArray.toString() +
- " on superstep " + superstep);
- }
- for (int i = 0; i < vertexRangeAssignmentsArray.length(); ++i) {
- JSONObject vertexRangeObj =
- vertexRangeAssignmentsArray.getJSONObject(i);
- Class<I> indexClass =
- BspUtils.getVertexIndexClass(getConfiguration());
- VertexRange<I, V, E, M> vertexRange =
- new VertexRange<I, V, E, M>(indexClass,
- vertexRangeObj);
- if (nextVertexRangeMap.containsKey(vertexRange.getMaxIndex())) {
- throw new IllegalStateException(
- "getVertexRangeMap: Impossible that vertex range " +
- "max " + vertexRange.getMaxIndex() +
- " already exists! Duplicate vertex ranges include " +
- nextVertexRangeMap.get(vertexRange.getMaxIndex()) +
- " and " + vertexRange);
- }
- nextVertexRangeMap.put(vertexRange.getMaxIndex(), vertexRange);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- // Copy over the vertices to the vertex ranges
- for (Entry<I, VertexRange<I, V, E, M>> entry :
- nextVertexRangeMap.entrySet()) {
- if (!vertexRangeMap.containsKey(entry.getKey())) {
- continue;
- }
- VertexRange<I, V, E, M> vertexRange =
- vertexRangeMap.get(entry.getKey());
- entry.getValue().getVertexMap().putAll(
- vertexRange.getVertexMap());
- }
- vertexRangeMap = nextVertexRangeMap;
- return vertexRangeMap;
- }
-
- public NavigableMap<I, VertexRange<I, V, E, M>> getCurrentVertexRangeMap()
- {
- return vertexRangeMap;
- }
-
- /**
* Register an aggregator with name.
*
* @param name Name of the aggregator
@@ -948,6 +840,15 @@ public abstract class BspService <
}
/**
+ * Subclasses can use this to instantiate their respective partitioners
+ *
+ * @return Instantiated graph partitioner factory
+ */
+ protected GraphPartitionerFactory<I, V, E, M> getGraphPartitionerFactory() {
+ return graphPartitionerFactory;
+ }
+
+ /**
* Derived classes that want additional ZooKeeper events to take action
* should override this.
*
@@ -982,6 +883,8 @@ public abstract class BspService <
LOG.info("process: Asynchronous connection complete.");
}
connectedEvent.signal();
+ } else {
+ LOG.warn("process: Got unknown null path event " + event);
}
return;
}
@@ -1025,30 +928,13 @@ public abstract class BspService <
}
inputSplitsStateChanged.signal();
eventProcessed = true;
- } else if (event.getPath().contains(VERTEX_RANGE_ASSIGNMENTS_DIR) &&
- event.getType() == EventType.NodeCreated) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: vertexRangeAssignmentsReadyChanged " +
- "(vertex ranges are assigned)");
- }
- vertexRangeAssignmentsReadyChanged.signal();
- eventProcessed = true;
- } else if (event.getPath().contains(VERTEX_RANGE_EXCHANGE_DIR) &&
- event.getType() == EventType.NodeChildrenChanged) {
- if (LOG.isInfoEnabled()) {
- LOG.info("process: vertexRangeExchangeChildrenChanged " +
- "(ready to exchanged vertex ranges)");
- }
- vertexRangeExchangeChildrenChanged.signal();
- eventProcessed = true;
- } else if (event.getPath().contains(
- VERTEX_RANGE_EXCHANGED_FINISHED_NODE) &&
+ } else if (event.getPath().contains(PARTITION_ASSIGNMENTS_DIR) &&
event.getType() == EventType.NodeCreated) {
if (LOG.isInfoEnabled()) {
- LOG.info("process: vertexRangeExchangeFinishedChanged " +
- "(vertex range exchange done)");
+ LOG.info("process: partitionAssignmentsReadyChanged " +
+ "(partitions are assigned)");
}
- vertexRangeExchangeFinishedChanged.signal();
+ partitionAssignmentsReadyChanged.signal();
eventProcessed = true;
} else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) &&
event.getType() == EventType.NodeCreated) {