You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2011/11/15 01:54:22 UTC
svn commit: r1201987 [3/5] - in /incubator/giraph/trunk: ./
src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/examples/ src/main/java/org/apache/giraph/...
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Tue Nov 15 00:54:20 2011
@@ -19,8 +19,19 @@
package org.apache.giraph.graph;
import net.iharder.Base64;
+
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.RPCCommunications;
+import org.apache.giraph.comm.ServerInterface;
+import org.apache.giraph.graph.partition.Partition;
+import org.apache.giraph.graph.partition.PartitionExchange;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.graph.partition.WorkerGraphPartitioner;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.zk.BspEvent;
+import org.apache.giraph.zk.PredicateLock;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -47,15 +58,16 @@ import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.net.UnknownHostException;
import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.NavigableMap;
import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
import java.util.TreeSet;
/**
@@ -71,38 +83,63 @@ public class BspServiceWorker<
implements CentralizedServiceWorker<I, V, E, M> {
/** Number of input splits */
private int inputSplitCount = -1;
- /** Cached aggregate number of vertices in the entire application */
- private long totalVertices = -1;
- /** Cached aggregate number of edges in the entire application */
- private long totalEdges = -1;
/** My process health znode */
private String myHealthZnode;
- /** Final server RPC port */
- private final int finalRpcPort;
/** List of aggregators currently in use */
private Set<String> aggregatorInUse = new TreeSet<String>();
+ /** Worker info */
+ private final WorkerInfo workerInfo;
+ /** Worker graph partitioner */
+ private final WorkerGraphPartitioner<I, V, E, M> workerGraphPartitioner;
+ /** Input split vertex cache (only used when loading from input split) */
+ private final Map<PartitionOwner, Partition<I, V, E, M>>
+ inputSplitCache = new HashMap<PartitionOwner, Partition<I, V, E, M>>();
+ /** Communication service */
+ private final ServerInterface<I, V, E, M> commService;
+ /** Structure to store the partitions on this worker */
+ private final Map<Integer, Partition<I, V, E, M>> workerPartitionMap =
+ new HashMap<Integer, Partition<I, V, E, M>>();
+ /** Have the partition exchange children (workers) changed? */
+ private final BspEvent partitionExchangeChildrenChanged =
+ new PredicateLock();
+ /** Max vertices per partition before sending */
+ private final int maxVerticesPerPartition;
/** Worker Context */
- private WorkerContext workerContext;
+ private final WorkerContext workerContext;
+ /** Total vertices loaded */
+ private long totalVerticesLoaded = 0;
+ /** Total edges loaded */
+ private long totalEdgesLoaded = 0;
/** Class logger */
private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
- public BspServiceWorker(String serverPortList,
- int sessionMsecTimeout,
- Mapper<?, ?, ?, ?>.Context context,
- GraphMapper<I, V, E, M> graphMapper) {
+ public BspServiceWorker(
+ String serverPortList,
+ int sessionMsecTimeout,
+ Mapper<?, ?, ?, ?>.Context context,
+ GraphMapper<I, V, E, M> graphMapper,
+ GraphState<I, V, E,M> graphState)
+ throws UnknownHostException, IOException, InterruptedException {
super(serverPortList, sessionMsecTimeout, context, graphMapper);
- this.finalRpcPort =
+ int finalRpcPort =
getConfiguration().getInt(GiraphJob.RPC_INITIAL_PORT,
- GiraphJob.RPC_INITIAL_PORT_DEFAULT) +
- getTaskPartition();
- this.workerContext = BspUtils.createWorkerContext(getConfiguration(),
+ GiraphJob.RPC_INITIAL_PORT_DEFAULT) +
+ getTaskPartition();
+ maxVerticesPerPartition=
+ getConfiguration().getInt(
+ GiraphJob.MAX_VERTICES_PER_PARTITION,
+ GiraphJob.MAX_VERTICES_PER_PARTITION_DEFAULT);
+ workerInfo =
+ new WorkerInfo(getHostname(), getTaskPartition(), finalRpcPort);
+ workerGraphPartitioner =
+ getGraphPartitionerFactory().createWorkerGraphPartitioner();
+ commService = new RPCCommunications<I, V, E, M>(
+ context, this, graphState);
+ graphState.setWorkerCommunications(commService);
+ this.workerContext = BspUtils.createWorkerContext(getConfiguration(),
graphMapper.getGraphState());
}
- public int getPort() {
- return finalRpcPort;
- }
-
public WorkerContext getWorkerContext() {
return workerContext;
}
@@ -182,8 +219,10 @@ public class BspServiceWorker<
CreateMode.EPHEMERAL,
false);
reservedInputSplitPath = inputSplitPath;
- LOG.info("reserveInputSplit: Reserved input split " +
- "path " + reservedInputSplitPath);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("reserveInputSplit: Reserved input " +
+ "split path " + reservedInputSplitPath);
+ }
return reservedInputSplitPath;
} catch (KeeperException.NodeExistsException e) {
LOG.info("reserveInputSplit: Couldn't reserve (already " +
@@ -211,80 +250,11 @@ public class BspServiceWorker<
}
/**
- * Each worker will set the vertex ranges that it has found for a given
- * InputSplit. After this, the InputSplit is considered finished.
- *
- * @param inputSplitPath path to the input split znode
- * @param maxIndexStatMap maps max vertex indexes to a list containing
- * the number of vertices (index 0) and the number of edges (index 1)
- * in each partition (can be null, where nothing is written)
- */
- private void setInputSplitVertexRanges(
- String inputSplitPath,
- Map<I, List<Long>> maxIndexStatMap) {
- String inputSplitFinishedPath =
- inputSplitPath + INPUT_SPLIT_FINISHED_NODE;
- byte [] zkData = null;
- JSONArray statArray = new JSONArray();
- if (maxIndexStatMap != null) {
- for (Map.Entry<I, List<Long>> entry : maxIndexStatMap.entrySet()) {
- try {
- ByteArrayOutputStream outputStream =
- new ByteArrayOutputStream();
- DataOutput output = new DataOutputStream(outputStream);
- ((Writable) entry.getKey()).write(output);
-
- JSONObject vertexRangeObj = new JSONObject();
- vertexRangeObj.put(JSONOBJ_NUM_VERTICES_KEY,
- entry.getValue().get(0));
- vertexRangeObj.put(JSONOBJ_NUM_EDGES_KEY,
- entry.getValue().get(1));
- vertexRangeObj.put(JSONOBJ_HOSTNAME_ID_KEY,
- getHostnamePartitionId());
- vertexRangeObj.put(JSONOBJ_MAX_VERTEX_INDEX_KEY,
- Base64.encodeBytes(
- outputStream.toByteArray()));
- vertexRangeObj.put(JSONOBJ_NUM_MESSAGES_KEY, 0L);
- statArray.put(vertexRangeObj);
- if (LOG.isInfoEnabled()) {
- LOG.info("setInputSplitVertexRanges: " +
- "Trying to add vertexRangeObj " +
- "(max vertex index = " + entry.getKey() + " " +
- vertexRangeObj + " to InputSplit path " +
- inputSplitPath);
- }
- } catch (Exception e) {
- throw new RuntimeException(
- "setInputSplitVertexRanges: Failed to add " +
- "vertex range " + entry.getKey(), e);
- }
- }
- zkData = statArray.toString().getBytes();
- }
- try {
- getZkExt().createExt(inputSplitFinishedPath,
- zkData,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- } catch (KeeperException.NodeExistsException e) {
- LOG.warn("setLocalVertexRanges: " + inputSplitFinishedPath +
- " already exists!");
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("setInputSplitVertexRanges: Finished loading " +
- inputSplitPath + " with vertexRanges - " + statArray);
- }
- }
-
- /**
* Load the vertices from the user-defined VertexReader into our partitions
* of vertex ranges. Do this until all the InputSplits have been processed.
* All workers will try to do as many InputSplits as they can. The master
* will monitor progress and stop this once all the InputSplits have been
- * loaded and check-pointed. The InputSplits must be sorted.
+ * loaded and check-pointed.
*
* @throws IOException
* @throws IllegalAccessException
@@ -292,21 +262,22 @@ public class BspServiceWorker<
* @throws ClassNotFoundException
* @throws InterruptedException
*/
- private void loadVertices() throws IOException, ClassNotFoundException,
+ private VertexEdgeCount loadVertices() throws IOException, ClassNotFoundException,
InterruptedException, InstantiationException,
IllegalAccessException {
String inputSplitPath = null;
+ VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
while ((inputSplitPath = reserveInputSplit()) != null) {
- Map<I, List<Long>> maxIndexStatMap =
- loadVerticesFromInputSplit(inputSplitPath);
- setInputSplitVertexRanges(inputSplitPath, maxIndexStatMap);
+ vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
+ loadVerticesFromInputSplit(inputSplitPath));
}
+ return vertexEdgeCount;
}
/**
- * Extract vertices from input split, generating mapping of Indices to
- * statistics about the vertices. As a side effect, load the vertices
- * into local global map
+ * Extract vertices from input split, saving them into a mini cache of
+ * partitions. Periodically flush the cache of vertices when a limit is
+ * reached. Mark the input split finished when done.
*
* @param inputSplitPath ZK location of input split
* @return Mapping of vertex indices and statistics, or null if no data read
@@ -316,29 +287,44 @@ public class BspServiceWorker<
* @throws InstantiationException
* @throws IllegalAccessException
*/
- private Map<I, List<Long>> loadVerticesFromInputSplit(String inputSplitPath)
+ private VertexEdgeCount loadVerticesFromInputSplit(String inputSplitPath)
throws IOException, ClassNotFoundException, InterruptedException,
InstantiationException, IllegalAccessException {
InputSplit inputSplit = getInputSplitForVertices(inputSplitPath);
-
- List<BasicVertex<I, V, E, M>> vertexList =
+ VertexEdgeCount vertexEdgeCount =
readVerticesFromInputSplit(inputSplit);
- if (LOG.isInfoEnabled()) {
- LOG.info("loadVertices: Got " + vertexList.size() +
- " vertices from input split " + inputSplit);
+ // Flush the remaining cached vertices
+ for (Entry<PartitionOwner, Partition<I, V, E, M>> entry :
+ inputSplitCache.entrySet()) {
+ if (!entry.getValue().getVertices().isEmpty()) {
+ commService.sendPartitionReq(entry.getKey().getWorkerInfo(),
+ entry.getValue());
+ entry.getValue().getVertices().clear();
+ }
}
+ inputSplitCache.clear();
- if (vertexList.isEmpty()) {
- return null;
+ // Mark this input split done to the master
+ String inputSplitFinishedPath =
+ inputSplitPath + INPUT_SPLIT_FINISHED_NODE;
+ try {
+ getZkExt().createExt(inputSplitFinishedPath,
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+ } catch (KeeperException.NodeExistsException e) {
+ LOG.warn("loadVerticesFromInputSplit: " + inputSplitFinishedPath +
+ " already exists!");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
-
- NavigableMap<I, VertexRange<I, V, E, M>> vertexRangeMap =
- getVertexRanges(inputSplit, vertexList);
-
- Map<I, List<Long>> maxIndexStatMap = getMaxIndexStatMap(vertexRangeMap);
-
- return maxIndexStatMap;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("loadVerticesFromInputSplit: Finished loading " +
+ inputSplitPath + " " + vertexEdgeCount);
+ }
+ return vertexEdgeCount;
}
/**
@@ -374,7 +360,7 @@ public class BspServiceWorker<
((Writable) inputSplit).readFields(inputStream);
if (LOG.isInfoEnabled()) {
- LOG.info("loadVertices: Reserved " + inputSplitPath +
+ LOG.info("getInputSplitForVertices: Reserved " + inputSplitPath +
" from ZooKeeper and got input split '" +
inputSplit.toString() + "'");
}
@@ -389,17 +375,18 @@ public class BspServiceWorker<
* @throws IOException
* @throws InterruptedException
*/
- private List<BasicVertex<I, V, E, M>> readVerticesFromInputSplit(
+ private VertexEdgeCount readVerticesFromInputSplit(
InputSplit inputSplit) throws IOException, InterruptedException {
- List<BasicVertex<I, V, E, M>> vertexList =
- new ArrayList<BasicVertex<I, V, E, M>>();
VertexInputFormat<I, V, E, M> vertexInputFormat =
BspUtils.<I, V, E, M>createVertexInputFormat(getConfiguration());
VertexReader<I, V, E, M> vertexReader =
vertexInputFormat.createVertexReader(inputSplit, getContext());
vertexReader.initialize(inputSplit, getContext());
+ long vertexCount = 0;
+ long edgeCount = 0;
while (vertexReader.nextVertex()) {
- BasicVertex<I, V, E, M> readerVertex = vertexReader.getCurrentVertex();
+ BasicVertex<I, V, E, M> readerVertex =
+ vertexReader.getCurrentVertex();
if (readerVertex.getVertexId() == null) {
throw new IllegalArgumentException(
"loadVertices: Vertex reader returned a vertex " +
@@ -409,180 +396,49 @@ public class BspServiceWorker<
readerVertex.setVertexValue(
BspUtils.<V>createVertexValue(getConfiguration()));
}
- // Vertices must be ordered
- if (!vertexList.isEmpty()) {
- @SuppressWarnings("unchecked")
- int compareTo =
- vertexList.get(vertexList.size() - 1).
- getVertexId().compareTo(readerVertex.getVertexId());
- if (compareTo > 0) {
- throw new IllegalArgumentException(
- "loadVertices: Illegal out of order vertices " +
- "from vertex reader previous vertex = " +
- vertexList.get(vertexList.size() - 1) +
- ", next vertex = " + readerVertex);
- }
+ PartitionOwner partitionOwner =
+ workerGraphPartitioner.getPartitionOwner(
+ readerVertex.getVertexId());
+ Partition<I, V, E, M> partition =
+ inputSplitCache.get(partitionOwner);
+ if (partition == null) {
+ partition = new Partition<I, V, E, M>(
+ getConfiguration(),
+ partitionOwner.getPartitionId());
+ inputSplitCache.put(partitionOwner, partition);
+ }
+ partition.putVertex(readerVertex);
+ if (partition.getVertices().size() >= maxVerticesPerPartition) {
+ commService.sendPartitionReq(partitionOwner.getWorkerInfo(),
+ partition);
+ partition.getVertices().clear();
}
- vertexList.add(readerVertex);
+ ++vertexCount;
+ edgeCount += readerVertex.getNumOutEdges();
getContext().progress();
- }
- vertexReader.close();
-
- return vertexList;
- }
- /**
- * Partition the vertices into their respective VertexRanges. The number of
- * vertex ranges may be up to half of the number of available workers and
- * must reach a minimum size.
- *
- * @param inputSplit inputSplit for these vertices
- * @param vertexList vertices from which to build the ranges
- * @return Ordered map of maxi vertex ID in range <-> vertices within range
- * @throws InstantiationException
- * @throws IllegalAccessException
- * @throws IOException
- */
- private NavigableMap<I, VertexRange<I, V, E, M>> getVertexRanges(
- InputSplit inputSplit, List<BasicVertex<I, V, E, M>> vertexList)
- throws InstantiationException, IllegalAccessException, IOException {
-
- NavigableMap<I, VertexRange<I, V, E, M>> vertexRangeMap =
- new TreeMap<I, VertexRange<I, V, E, M>>();
- long vertexRangesPerInputSplit = getVertexRangesPerInputSplit(inputSplit);
- long vertexRangeSize = vertexList.size() / vertexRangesPerInputSplit;
- long minPerVertexRange =
- getConfiguration().getLong(
- GiraphJob.MIN_VERTICES_PER_RANGE,
- GiraphJob.MIN_VERTICES_PER_RANGE_DEFAULT);
- if (vertexRangeSize < minPerVertexRange) {
- vertexRangeSize = minPerVertexRange;
- }
- I vertexIdMax = null;
-
- // Identify the endpoints of the ranges, create empty placeholders
- for (int i = 0; i < vertexList.size(); ++i) {
- if ((vertexIdMax != null) && ((i % vertexRangeSize) == 0)) {
- VertexRange<I, V, E, M> vertexRange =
- new VertexRange<I, V, E, M>(
- null, -1, null, vertexIdMax, null);
- vertexRangeMap.put(vertexIdMax, vertexRange);
- vertexIdMax = null;
- }
-
- if (vertexIdMax == null) {
- vertexIdMax = vertexList.get(i).getVertexId();
- } else {
- @SuppressWarnings("unchecked")
- int compareTo =
- vertexList.get(i).getVertexId().compareTo(vertexIdMax);
- if (compareTo > 0) {
- vertexIdMax = vertexList.get(i).getVertexId();
- }
- }
- }
- if (vertexIdMax == null) {
- throw new RuntimeException("loadVertices: Encountered " +
- "impossible null vertexIdMax.");
- }
- VertexRange<I, V, E, M> finalRange =
- new VertexRange<I, V, E, M>(null, -1, null, vertexIdMax, null);
- vertexRangeMap.put(vertexIdMax, finalRange);
-
- // Now iterate over the defined ranges, placing each vertex in its range
- Iterator<I> maxIndexVertexMapIt = vertexRangeMap.keySet().iterator();
- I currentVertexIndexMax = maxIndexVertexMapIt.next();
- for (BasicVertex<I, V, E, M> vertex : vertexList) {
- @SuppressWarnings("unchecked")
- int compareTo = vertex.getVertexId().compareTo(currentVertexIndexMax);
- if (compareTo > 0) {
- if (!maxIndexVertexMapIt.hasNext()) {
- throw new RuntimeException(
- "loadVertices: Impossible that vertex " +
- vertex.getVertexId() + " > " +
- currentVertexIndexMax);
+ ++totalVerticesLoaded;
+ totalEdgesLoaded += readerVertex.getNumOutEdges();
+ // Update status every half a million vertices
+ if ((totalVerticesLoaded % 500000) == 0) {
+ String status = "readVerticesFromInputSplit: Loaded " +
+ totalVerticesLoaded + " vertices and " +
+ totalEdgesLoaded + " edges " +
+ ", totalMem = " + Runtime.getRuntime().totalMemory() +
+ " maxMem =" + Runtime.getRuntime().maxMemory() +
+ " freeMem=" + Runtime.getRuntime().freeMemory() + " " +
+ getGraphMapper().getMapFunctions().toString() +
+ " - Attempt=" + getApplicationAttempt() +
+ ", Superstep=" + getSuperstep();
+ if (LOG.isInfoEnabled()) {
+ LOG.info(status);
}
- currentVertexIndexMax = maxIndexVertexMapIt.next();
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("loadVertices: Adding vertex with index = " +
- vertex.getVertexId() + " to vertex range max = " +
- currentVertexIndexMax);
- }
- VertexRange<I, V, E, M> range =
- vertexRangeMap.get(currentVertexIndexMax);
- SortedMap<I, BasicVertex<I, V, E, M>> vertexMap = range.getVertexMap();
- if (vertexMap.put(vertex.getVertexId(), vertex) != null) {
- throw new IllegalStateException(
- "loadVertices: Already contains vertex " +
- vertex.toString() + " in vertex range max " +
- currentVertexIndexMax);
+ getContext().setStatus(status);
}
}
- return vertexRangeMap;
- }
-
- /**
- * Determine the number of VertexRanges per InputSplit.
- *
- * @param inputSplit inputSplit to generate number for
- * @return number of VertexRanges per split
- */
- private long getVertexRangesPerInputSplit(InputSplit inputSplit) {
- // ZooKeeper has a limit of the data in a single znode of 1 MB and
- // each entry can go be on the average somewhat more than 300 bytes
- final long
- maxVertexRangesPerInputSplit = 1024 * 1024 / 350 / inputSplitCount;
-
- long vertexRangesPerInputSplit = (long) (inputSplitCount *
- getConfiguration().getFloat(
- GiraphJob.TOTAL_INPUT_SPLIT_MULTIPLIER,
- GiraphJob.TOTAL_INPUT_SPLIT_MULTIPLIER_DEFAULT));
- if (vertexRangesPerInputSplit == 0) {
- vertexRangesPerInputSplit = 1;
- } else if (vertexRangesPerInputSplit > maxVertexRangesPerInputSplit) {
- LOG.warn("loadVertices: Using " + maxVertexRangesPerInputSplit +
- " instead of " + vertexRangesPerInputSplit +
- " vertex ranges on input split " + inputSplit);
- vertexRangesPerInputSplit = maxVertexRangesPerInputSplit;
- }
- return vertexRangesPerInputSplit;
- }
-
- /**
- * Generate a mapping of max vertex indices to two-element lists consisting
- * of the number of vertices and the number of edges in each partition.
- * Additionally, adds entries of the vertexRangeMap to local map.
- *
- * @param vertexRangeMap Mapping of indices to VertexRanges
- * @return mapping of vertices to stats, or null if no vertices
- * are written for the partition
- */
- private Map<I, List<Long>> getMaxIndexStatMap(
- NavigableMap<I, VertexRange<I, V, E, M>> vertexRangeMap) {
- Map<I, List<Long>> maxIndexStatMap = new TreeMap<I, List<Long>>();
-
- for (Entry<I, VertexRange<I, V, E, M>> entry :
- vertexRangeMap.entrySet()) {
- List<Long> statList = new ArrayList<Long>(2);
- long vertexRangeEdgeCount = 0;
- for (BasicVertex<I, V, E, M> vertex :
- entry.getValue().getVertexMap().values()) {
- vertexRangeEdgeCount += vertex.getNumOutEdges();
- }
- statList.add(Long.valueOf(entry.getValue().getVertexMap().size()));
- statList.add(Long.valueOf(vertexRangeEdgeCount));
- if (LOG.isInfoEnabled()) {
- LOG.info("loadVertices: Got " + statList.get(0) +
- " vertices and " + statList.get(1) +
- " edges from vertex range max index " + entry.getKey());
- }
- maxIndexStatMap.put(entry.getKey(), statList);
+ vertexReader.close();
- // Add the local vertex ranges to the stored vertex ranges
- getStorableVertexRangeMap().put(entry.getKey(), entry.getValue());
- }
- return maxIndexStatMap;
+ return new VertexEdgeCount(vertexCount, edgeCount);
}
@Override
@@ -620,7 +476,13 @@ public class BspServiceWorker<
}
}
- startSuperstep();
+ // Add the partitions for that this worker owns
+ Collection<? extends PartitionOwner> masterSetPartitionOwners =
+ startSuperstep();
+
+ workerGraphPartitioner.updatePartitionOwners(
+ getWorkerInfo(), masterSetPartitionOwners, getPartitionMap());
+ commService.setup();
// Ensure the InputSplits are ready for processing before processing
while (true) {
@@ -643,22 +505,50 @@ public class BspServiceWorker<
}
getContext().progress();
+
try {
- loadVertices();
+ VertexEdgeCount vertexEdgeCount = loadVertices();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("setup: Finally loaded a total of " +
+ vertexEdgeCount);
+ }
} catch (Exception e) {
LOG.error("setup: loadVertices failed - ", e);
throw new IllegalStateException("setup: loadVertices failed", e);
}
+ getContext().progress();
- long workerVertices = 0;
- long workerEdges = 0;
- for (VertexRange<I, V, E, M> vertexRange :
- getStorableVertexRangeMap().values()) {
- workerVertices += vertexRange.getVertexCount();
- workerEdges += vertexRange.getEdgeCount();
+ // At this point all vertices have been sent to their destinations.
+ // Move them to the worker, creating creating the empty partitions
+ movePartitionsToWorker(commService);
+ for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
+ if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&
+ !getPartitionMap().containsKey(
+ partitionOwner.getPartitionId())) {
+ Partition<I, V, E, M> partition =
+ new Partition<I, V, E, M>(getConfiguration(),
+ partitionOwner.getPartitionId());
+ getPartitionMap().put(partitionOwner.getPartitionId(),
+ partition);
+ }
}
- finishSuperstep(0, workerVertices, workerEdges, 0);
+ // Generate the partition stats for the input superstep and process
+ // if necessary
+ List<PartitionStats> partitionStatsList =
+ new ArrayList<PartitionStats>();
+ for (Partition<I, V, E, M> partition : getPartitionMap().values()) {
+ PartitionStats partitionStats =
+ new PartitionStats(partition.getPartitionId(),
+ partition.getVertices().size(),
+ 0,
+ partition.getEdgeCount());
+ partitionStatsList.add(partitionStats);
+ }
+ workerGraphPartitioner.finalizePartitionStats(
+ partitionStatsList, workerPartitionMap);
+
+ finishSuperstep(partitionStatsList, 0);
}
/**
@@ -753,9 +643,9 @@ public class BspServiceWorker<
new DataInputStream(input));
aggregator.setAggregatedValue(aggregatorValue);
if (LOG.isDebugEnabled()) {
- LOG.info("getAggregatorValues: " +
- "Got aggregator=" + aggregatorName + " value=" +
- aggregatorValue);
+ LOG.debug("getAggregatorValues: " +
+ "Got aggregator=" + aggregatorName + " value=" +
+ aggregatorValue);
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -777,25 +667,25 @@ public class BspServiceWorker<
JSONArray hostnamePort = new JSONArray();
hostnamePort.put(getHostname());
- hostnamePort.put(finalRpcPort);
+ hostnamePort.put(workerInfo.getPort());
String myHealthPath = null;
if (isHealthy()) {
- myHealthPath = getWorkerHealthyPath(getApplicationAttempt(),
- getSuperstep());
+ myHealthPath = getWorkerInfoHealthyPath(getApplicationAttempt(),
+ getSuperstep());
}
else {
- myHealthPath = getWorkerUnhealthyPath(getApplicationAttempt(),
- getSuperstep());
+ myHealthPath = getWorkerInfoUnhealthyPath(getApplicationAttempt(),
+ getSuperstep());
}
- myHealthPath = myHealthPath + "/" + getHostnamePartitionId();
+ myHealthPath = myHealthPath + "/" + workerInfo.getHostnameId();
try {
- myHealthZnode =
- getZkExt().createExt(myHealthPath,
- hostnamePort.toString().getBytes(),
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL,
- true);
+ myHealthZnode = getZkExt().createExt(
+ myHealthPath,
+ WritableUtils.writeToByteArray(workerInfo),
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL,
+ true);
} catch (KeeperException.NodeExistsException e) {
LOG.warn("registerHealth: myHealthPath already exists (likely " +
"from previous failure): " + myHealthPath +
@@ -816,51 +706,74 @@ public class BspServiceWorker<
LOG.info("registerHealth: Created my health node for attempt=" +
getApplicationAttempt() + ", superstep=" +
getSuperstep() + " with " + myHealthZnode +
- " and hostnamePort = " + hostnamePort.toString());
+ " and workerInfo= " + workerInfo);
}
}
- public boolean startSuperstep() {
+ @Override
+ public Collection<? extends PartitionOwner> startSuperstep() {
// Algorithm:
- // 1. Register my health for the next superstep.
- // 2. Wait until the vertex range assignment is complete (unless
- // superstep 0).
+ // 1. Communication service will combine message from previous
+ // superstep
+ // 2. Register my health for the next superstep.
+ // 3. Wait until the partition assignment is complete and get it
+ // 4. Get the aggregator values from the previous superstep
+ if (getSuperstep() != INPUT_SUPERSTEP) {
+ commService.prepareSuperstep();
+ }
+
registerHealth(getSuperstep());
- String vertexRangeAssignmentsNode = null;
- if (getSuperstep() > INPUT_SUPERSTEP) {
- vertexRangeAssignmentsNode =
- getVertexRangeAssignmentsPath(getApplicationAttempt(),
- getSuperstep());
- try {
- while (getZkExt().exists(vertexRangeAssignmentsNode, true) ==
- null) {
- getVertexRangeAssignmentsReadyChangedEvent().waitForever();
- getVertexRangeAssignmentsReadyChangedEvent().reset();
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("startSuperstep: Ready for computation on superstep " +
- getSuperstep() + " since worker " +
- "selection and vertex range assignments are done in " +
- vertexRangeAssignmentsNode);
- }
+ String partitionAssignmentsNode =
+ getPartitionAssignmentsPath(getApplicationAttempt(),
+ getSuperstep());
+ Collection<? extends PartitionOwner> masterSetPartitionOwners;
+ try {
+ while (getZkExt().exists(partitionAssignmentsNode, true) ==
+ null) {
+ getPartitionAssignmentsReadyChangedEvent().waitForever();
+ getPartitionAssignmentsReadyChangedEvent().reset();
+ }
+ List<? extends Writable> writableList =
+ WritableUtils.readListFieldsFromZnode(
+ getZkExt(),
+ partitionAssignmentsNode,
+ false,
+ null,
+ workerGraphPartitioner.createPartitionOwner().getClass(),
+ getConfiguration());
+
+ @SuppressWarnings("unchecked")
+ Collection<? extends PartitionOwner> castedWritableList =
+ (Collection<? extends PartitionOwner>) writableList;
+ masterSetPartitionOwners = castedWritableList;
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "startSuperstep: KeeperException getting assignments", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "startSuperstep: InterruptedException getting assignments", e);
}
- getAggregatorValues(getSuperstep());
+ if (LOG.isInfoEnabled()) {
+ LOG.info("startSuperstep: Ready for computation on superstep " +
+ getSuperstep() + " since worker " +
+ "selection and vertex range assignments are done in " +
+ partitionAssignmentsNode);
+ }
+
+ if (getSuperstep() != INPUT_SUPERSTEP) {
+ getAggregatorValues(getSuperstep());
+ }
getContext().setStatus("startSuperstep: " +
getGraphMapper().getMapFunctions().toString() +
" - Attempt=" + getApplicationAttempt() +
", Superstep=" + getSuperstep());
- return true;
+ return masterSetPartitionOwners;
}
@Override
- public boolean finishSuperstep(long workerFinishedVertices,
- long workerVertices,
- long workerEdges,
+ public boolean finishSuperstep(List<PartitionStats> partitionStatsList,
long workersSentMessages) {
// This barrier blocks until success (or the master signals it to
// restart).
@@ -872,18 +785,27 @@ public class BspServiceWorker<
// of this worker
// 3. Let the master know it is finished.
// 4. Then it waits for the master to say whether to stop or not.
+ try {
+ commService.flush(getContext());
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "finishSuperstep: flush failed", e);
+ }
JSONArray aggregatorValueArray =
marshalAggregatorValues(getSuperstep());
+ Collection<PartitionStats> finalizedPartitionStats =
+ workerGraphPartitioner.finalizePartitionStats(
+ partitionStatsList, workerPartitionMap);
+ List<PartitionStats> finalizedPartitionStatsList =
+ new ArrayList<PartitionStats>(finalizedPartitionStats);
+ byte [] partitionStatsBytes =
+ WritableUtils.writeListToByteArray(finalizedPartitionStatsList);
JSONObject workerFinishedInfoObj = new JSONObject();
try {
workerFinishedInfoObj.put(JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY,
aggregatorValueArray);
- workerFinishedInfoObj.put(JSONOBJ_FINISHED_VERTICES_KEY,
- workerFinishedVertices);
- workerFinishedInfoObj.put(JSONOBJ_NUM_VERTICES_KEY,
- workerVertices);
- workerFinishedInfoObj.put(JSONOBJ_NUM_EDGES_KEY,
- workerEdges);
+ workerFinishedInfoObj.put(JSONOBJ_PARTITION_STATS_KEY,
+ Base64.encodeBytes(partitionStatsBytes));
workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY,
workersSentMessages);
} catch (JSONException e) {
@@ -905,54 +827,46 @@ public class BspServiceWorker<
throw new RuntimeException(e);
}
+ getContext().setStatus("finishSuperstep: (waiting for rest " +
+ "of workers) " +
+ getGraphMapper().getMapFunctions().toString() +
+ " - Attempt=" + getApplicationAttempt() +
+ ", Superstep=" + getSuperstep());
+
String superstepFinishedNode =
getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
- JSONObject globalStatsObject = null;
try {
while (getZkExt().exists(superstepFinishedNode, true) == null) {
getSuperstepFinishedEvent().waitForever();
getSuperstepFinishedEvent().reset();
}
- globalStatsObject = new JSONObject(
- new String(getZkExt().getData(superstepFinishedNode,
- false,
- null)));
- } catch (Exception e) {
- throw new RuntimeException(
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "finishSuperstep: Failed while waiting for master to " +
+ "signal completion of superstep " + getSuperstep(), e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
"finishSuperstep: Failed while waiting for master to " +
"signal completion of superstep " + getSuperstep(), e);
}
- long finishedVertices =
- globalStatsObject.optLong(JSONOBJ_FINISHED_VERTICES_KEY);
- totalVertices =
- globalStatsObject.optLong(JSONOBJ_NUM_VERTICES_KEY);
- totalEdges =
- globalStatsObject.optLong(JSONOBJ_NUM_EDGES_KEY);
- long sentMessages =
- globalStatsObject.optLong(JSONOBJ_NUM_MESSAGES_KEY);
+ GlobalStats globalStats = new GlobalStats();
+ WritableUtils.readFieldsFromZnode(
+ getZkExt(), superstepFinishedNode, false, null, globalStats);
if (LOG.isInfoEnabled()) {
LOG.info("finishSuperstep: Completed superstep " + getSuperstep() +
- " with total finished vertices = " + finishedVertices +
- " of out total vertices = " + totalVertices +
- ", total edges = " + totalEdges + ", sent messages = " +
- sentMessages);
+ " with global stats " + globalStats);
}
incrCachedSuperstep();
- getContext().setStatus("finishSuperstep: " +
+ getContext().setStatus("finishSuperstep: (all workers done) " +
getGraphMapper().getMapFunctions().toString() +
" - Attempt=" + getApplicationAttempt() +
", Superstep=" + getSuperstep());
- return ((finishedVertices == totalVertices) && (sentMessages == 0));
- }
-
- @Override
- public long getTotalVertices() {
- return totalVertices;
- }
-
- @Override
- public long getTotalEdges() {
- return totalEdges;
+ getGraphMapper().getGraphState().
+ setNumEdges(globalStats.getEdgeCount()).
+ setNumVertices(globalStats.getVertexCount());
+ return ((globalStats.getFinishedVertexCount() ==
+ globalStats.getVertexCount()) &&
+ (globalStats.getMessageCount() == 0));
}
/**
@@ -973,10 +887,8 @@ public class BspServiceWorker<
VertexWriter<I, V, E> vertexWriter =
vertexOutputFormat.createVertexWriter(getContext());
vertexWriter.initialize(getContext());
- for (Map.Entry<I, VertexRange<I, V, E, M>> entry :
- getVertexRangeMap().entrySet()) {
- for (BasicVertex<I, V, E, M> vertex :
- entry.getValue().getVertexMap().values()) {
+ for (Partition<I, V, E, M> partition : workerPartitionMap.values()) {
+ for (BasicVertex<I, V, E, M> vertex : partition.getVertices()) {
vertexWriter.writeVertex(vertex);
}
}
@@ -985,6 +897,7 @@ public class BspServiceWorker<
@Override
public void cleanup() throws IOException, InterruptedException {
+ commService.closeConnections();
setCachedSuperstep(getSuperstep() - 1);
saveVertices();
// All worker processes should denote they are done by adding special
@@ -1024,27 +937,20 @@ public class BspServiceWorker<
// cleanup phase -- just log the error
LOG.error("cleanup: Zookeeper failed to close with " + e);
}
- }
-
- @Override
- public VertexRange<I, V, E, M> getVertexRange(long superstep, I index) {
- I maxVertexIndex = getVertexRangeMap(superstep).ceilingKey(index);
- if (maxVertexIndex == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("getVertexRange: no partition for " +
- "destination vertex " +
- index + " -- returning last partition");
- }
- return getVertexRangeMap(superstep).lastEntry().getValue();
- }
- else {
- return getVertexRangeMap(superstep).get(maxVertexIndex);
- }
+ // Preferably would shut down the service only after
+ // all clients have disconnected (or the exceptions on the
+ // client side ignored).
+ commService.close();
}
@Override
public void storeCheckpoint() throws IOException {
+ getContext().setStatus("storeCheckpoint: Starting checkpoint " +
+ getGraphMapper().getMapFunctions().toString() +
+ " - Attempt=" + getApplicationAttempt() +
+ ", Superstep=" + getSuperstep());
+
// Algorithm:
// For each partition, dump vertices and messages
Path metadataFilePath =
@@ -1060,91 +966,46 @@ public class BspServiceWorker<
getHostnamePartitionId() +
CHECKPOINT_VALID_POSTFIX);
- // Remove these files if they already exist
- try {
- getFs().delete(validFilePath, false);
- LOG.warn("storeCheckpoint: Removed file " + validFilePath);
- } catch (IOException e) {
+ // Remove these files if they already exist (shouldn't though, unless
+ // of previous failure of this worker)
+ if (getFs().delete(validFilePath, false)) {
+ LOG.warn("storeCheckpoint: Removed valid file " +
+ validFilePath);
+ }
+ if (getFs().delete(metadataFilePath, false)) {
+ LOG.warn("storeCheckpoint: Removed metadata file " +
+ metadataFilePath);
}
- try {
- getFs().delete(metadataFilePath, false);
- LOG.warn("storeCheckpoint: Removed file " + metadataFilePath);
- } catch (IOException e) {
- }
- try {
- getFs().delete(verticesFilePath, false);
+ if (getFs().delete(verticesFilePath, false)) {
LOG.warn("storeCheckpoint: Removed file " + verticesFilePath);
- } catch (IOException e) {
}
FSDataOutputStream verticesOutputStream =
getFs().create(verticesFilePath);
ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
DataOutput metadataOutput = new DataOutputStream(metadataByteStream);
- long workerVertexRanges = 0;
- for (Map.Entry<I, VertexRange<I, V, E, M>> entry :
- getVertexRangeMap().entrySet()) {
- // Only write out the partitions the worker is responsible for
- if (!entry.getValue().getHostnameId().equals(
- getHostnamePartitionId())) {
- continue;
- }
-
- ++workerVertexRanges;
- // Write the vertices (index, data, edges and messages)
- // Format:
- // <vertex count>
- // <v0 id><v0 value>
- // <v0 num edges>
- // <v0 edge 0 dest><v0 edge 0 value>
- // <v0 edge 1 dest><v0 edge 1 value>...
- // <v0 message count>
- // <v0 msg 0><v0 msg 1>...
+ for (Partition<I, V, E, M> partition : workerPartitionMap.values()) {
long startPos = verticesOutputStream.getPos();
- verticesOutputStream.writeLong(
- entry.getValue().getVertexMap().size());
- for (BasicVertex<I, V, E, M> vertex :
- entry.getValue().getVertexMap().values()) {
- ByteArrayOutputStream vertexByteStream =
- new ByteArrayOutputStream();
- DataOutput vertexOutput =
- new DataOutputStream(vertexByteStream);
- vertex.write(vertexOutput);
- verticesOutputStream.write(vertexByteStream.toByteArray());
- if (LOG.isDebugEnabled()) {
- LOG.debug("storeCheckpoint: Wrote vertex id = " +
- vertex.getVertexId() + " with " +
- vertex.getNumOutEdges() + " edges and " +
- vertex.getMsgList().size() + " messages (" +
- vertexByteStream.size() + " total bytes)");
- }
- }
- // Write the metadata for this vertex range
+ partition.write(verticesOutputStream);
+ // Write the metadata for this partition
// Format:
// <index count>
- // <index 0 start pos><# vertices><# edges><max index 0>
- // <index 1 start pos><# vertices><# edges><max index 1>...
+ // <index 0 start pos><partition id>
+ // <index 1 start pos><partition id>
metadataOutput.writeLong(startPos);
- metadataOutput.writeLong(entry.getValue().getVertexMap().size());
- long edgeCount = 0;
- for (BasicVertex<I, V, E, M> vertex :
- entry.getValue().getVertexMap().values()) {
- edgeCount += vertex.getNumOutEdges();
- }
- metadataOutput.writeLong(edgeCount);
- entry.getKey().write(metadataOutput);
+ metadataOutput.writeInt(partition.getPartitionId());
if (LOG.isDebugEnabled()) {
LOG.debug("storeCheckpoint: Vertex file starting " +
"offset = " + startPos + ", length = " +
(verticesOutputStream.getPos() - startPos) +
- ", max index of vertex range = " + entry.getKey());
+ ", partition = " + partition.toString());
}
}
// Metadata is buffered and written at the end since it's small and
- // needs to know how many vertex ranges this worker owns
+ // needs to know how many partitions this worker owns
FSDataOutputStream metadataOutputStream =
getFs().create(metadataFilePath);
- metadataOutputStream.writeLong(workerVertexRanges);
+ metadataOutputStream.writeInt(workerPartitionMap.size());
metadataOutputStream.write(metadataByteStream.toByteArray());
metadataOutputStream.close();
verticesOutputStream.close();
@@ -1157,246 +1018,254 @@ public class BspServiceWorker<
getFs().createNewFile(validFilePath);
}
- /**
- * Load a single vertex range from checkpoint files.
- *
- * @param maxIndex denotes the vertex range
- * @param dataFileName name of the data file
- * @param startPos position to start from in data file
- * @throws IOException
- * @throws IllegalAccessException
- * @throws InstantiationException
- */
- private void loadVertexRange(I maxIndex,
- String dataFileName,
- long startPos)
- throws IOException, InstantiationException, IllegalAccessException {
- // Read in the reverse order from storeCheckpoint()
- DataInputStream dataStream = getFs().open(new Path(dataFileName));
- if (dataStream.skip(startPos) != startPos) {
- throw new IllegalStateException(
- "loadVertexRange: Failed to skip " + startPos);
- }
- long vertexCount = dataStream.readLong();
- VertexRange<I, V, E, M> vertexRange = getVertexRangeMap().get(maxIndex);
- for (int i = 0; i < vertexCount; ++i) {
- BasicVertex<I, V, E, M> vertex = BspUtils.<I, V, E, M>createVertex(getConfiguration());
- vertex.setGraphState(getGraphMapper().getGraphState());
- vertex.readFields(dataStream);
- // Add the vertex
- if (vertexRange.getVertexMap().put(vertex.getVertexId(), vertex)
- != null) {
- throw new IllegalStateException(
- "loadVertexRange: Vertex " + vertex + " already exists");
- }
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("loadVertexRange: " + vertexCount + " vertices in " +
- dataFileName);
- }
- dataStream.close();
- }
-
@Override
public void loadCheckpoint(long superstep) {
// Algorithm:
- // Check all the vertex ranges for this worker and load the ones
- // that match my hostname and id.
- I maxVertexIndex = BspUtils.<I>createVertexIndex(getConfiguration());
- long startPos = -1;
- long vertexRangeCount = -1;
- for (VertexRange<I, V, E, M> vertexRange :
- getVertexRangeMap().values()) {
- if (vertexRange.getHostnameId()
- .compareTo(getHostnamePartitionId()) == 0) {
+ // Examine all the partition owners and load the ones
+ // that match my hostname and id from the master designated checkpoint
+ // prefixes.
+ long startPos = 0;
+ int loadedPartitions = 0;
+ for (PartitionOwner partitionOwner :
+ workerGraphPartitioner.getPartitionOwners()) {
+ if (partitionOwner.getWorkerInfo().equals(getWorkerInfo())) {
String metadataFile =
- vertexRange.getCheckpointFilePrefix() +
+ partitionOwner.getCheckpointFilesPrefix() +
CHECKPOINT_METADATA_POSTFIX;
+ String partitionsFile =
+ partitionOwner.getCheckpointFilesPrefix() +
+ CHECKPOINT_VERTICES_POSTFIX;
try {
+ int partitionId = -1;
DataInputStream metadataStream =
getFs().open(new Path(metadataFile));
- vertexRangeCount = metadataStream.readLong();
- for (int i = 0; i < vertexRangeCount; ++i) {
+ int partitions = metadataStream.readInt();
+ for (int i = 0; i < partitions; ++i) {
startPos = metadataStream.readLong();
- // Skip the vertex count
- metadataStream.readLong();
- // Skip the edge count
- metadataStream.readLong();
- maxVertexIndex.readFields(metadataStream);
- @SuppressWarnings("unchecked")
- int compareTo =
- vertexRange.getMaxIndex().compareTo(maxVertexIndex);
- if (LOG.isDebugEnabled()) {
- LOG.debug("loadCheckpoint: Comparing " +
- vertexRange.getMaxIndex() + " and " +
- maxVertexIndex + " = " + compareTo);
- }
- if (compareTo == 0) {
- loadVertexRange(
- vertexRange.getMaxIndex(),
- vertexRange.getCheckpointFilePrefix() +
- CHECKPOINT_VERTICES_POSTFIX,
- startPos);
+ partitionId = metadataStream.readInt();
+ if (partitionId == partitionOwner.getPartitionId()) {
+ break;
}
}
+ if (partitionId != partitionOwner.getPartitionId()) {
+ throw new IllegalStateException(
+ "loadCheckpoint: " + partitionOwner +
+ " not found!");
+ }
metadataStream.close();
- } catch (Exception e) {
- throw new RuntimeException(e);
+ Partition<I, V, E, M> partition =
+ new Partition<I, V, E, M>(
+ getConfiguration(),
+ partitionId);
+ DataInputStream partitionsStream =
+ getFs().open(new Path(partitionsFile));
+ if (partitionsStream.skip(startPos) != startPos) {
+ throw new IllegalStateException(
+ "loadCheckpoint: Failed to skip " + startPos +
+ " on " + partitionsFile);
+ }
+ partition.readFields(partitionsStream);
+ partitionsStream.close();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("loadCheckpoint: Loaded partition " +
+ partition);
+ }
+ if (getPartitionMap().put(partitionId, partition) != null) {
+ throw new IllegalStateException(
+ "loadCheckpoint: Already has partition owner " +
+ partitionOwner);
+ }
+ ++loadedPartitions;
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "loadCheckpoing: Failed to get partition owner " +
+ partitionOwner, e);
}
}
}
+ if (LOG.isInfoEnabled()) {
+ LOG.info("loadCheckpoint: Loaded " + loadedPartitions +
+ " partitions of out " +
+ workerGraphPartitioner.getPartitionOwners().size() +
+ " total.");
+ }
+ // Communication service needs to setup the connections prior to
+ // processing vertices
+ commService.setup();
}
- @Override
- public final void exchangeVertexRanges() {
- boolean syncRequired = false;
- for (Entry<I, VertexRange<I, V, E, M>> entry :
- getVertexRangeMap().entrySet()) {
- final int previousPort = entry.getValue().getPreviousPort();
- final String previousHostname =
- entry.getValue().getPreviousHostname();
- final int port = entry.getValue().getPort();
- final String hostname = entry.getValue().getHostname();
- if (LOG.isDebugEnabled()) {
- LOG.debug("exchangeVertexRanges: For max index " +
- entry.getKey() + ", count " +
- entry.getValue().getVertexMap().size() +
- ", has previous port " +
- previousPort + ", previous hostname "
- + previousHostname +
- ", port " + port + ", hostname " + hostname);
- }
- if (previousPort == -1) {
- continue;
- }
-
- if ((previousPort == finalRpcPort) &&
- getHostname().equals(previousHostname) &&
- ((port != finalRpcPort) ||
- !(getHostname().equals(hostname)))) {
- if (!syncRequired) {
- getGraphMapper().getWorkerCommunications().
- cleanCachedVertexAddressMap();
- }
- List<BasicVertex<I, V, E, M>> vertexList =
- new ArrayList<BasicVertex<I, V, E, M>>(
- entry.getValue().getVertexMap().values());
- if (vertexList != null) {
- LOG.info("exchangeVertexRanges: Sending vertex range " +
- entry.getKey() + " with " +
- vertexList.size() + " elements to " + hostname +
- ":" + port);
- getGraphMapper().getWorkerCommunications().sendVertexListReq(
- entry.getKey(), vertexList);
- vertexList.clear();
- entry.getValue().getVertexMap().clear();
- if (LOG.isInfoEnabled()) {
- LOG.info("exchangeVertexRanges: Sent vertex range " +
- entry.getKey() + " with " +
- vertexList.size() +
- " elements to " + hostname +
- ":" + port + " " + vertexList.size() + " " +
- entry.getValue().getVertexMap().size());
- }
+ /**
+ * Send the worker partitions to their destination workers
+ *
+ * @param workerPartitionMap Map of worker info to the partitions stored
+ * on this worker to be sent
+ */
+ private void sendWorkerPartitions(
+ Map<WorkerInfo, List<Integer>> workerPartitionMap) {
+ List<Entry<WorkerInfo, List<Integer>>> randomEntryList =
+ new ArrayList<Entry<WorkerInfo, List<Integer>>>(
+ workerPartitionMap.entrySet());
+ Collections.shuffle(randomEntryList);
+ for (Entry<WorkerInfo, List<Integer>> workerPartitionList :
+ randomEntryList) {
+ for (Integer partitionId : workerPartitionList.getValue()) {
+ Partition<I, V, E, M> partition =
+ getPartitionMap().get(partitionId);
+ if (partition == null) {
+ throw new IllegalStateException(
+ "sendWorkerPartitions: Couldn't find partition " +
+ partitionId + " to send to " +
+ workerPartitionList.getKey());
}
- syncRequired = true;
- }
- else if ((port == finalRpcPort) &&
- getHostname().equals(hostname) &&
- ((previousPort != finalRpcPort) ||
- !(getHostname().equals(previousHostname)))) {
if (LOG.isInfoEnabled()) {
- LOG.info("exchangeVertexRanges: Receiving " +
- entry.getKey() + " from " +
- previousHostname + ":" + previousPort);
- }
- if (!syncRequired) {
- getGraphMapper().getWorkerCommunications().
- cleanCachedVertexAddressMap();
- }
- VertexRange<I, V, E, M> destVertexRange =
- getVertexRangeMap().get(entry.getKey());
- if ((destVertexRange.getVertexMap() != null) &&
- !destVertexRange.getVertexMap().isEmpty()) {
- throw new RuntimeException(
- "exchangeVertexRanges: Cannot receive max index " +
- entry.getKey() + " since already have " +
- destVertexRange.getVertexMap().size() + " elements.");
+ LOG.info("sendWorkerPartitions: Sending worker " +
+ workerPartitionList.getKey() + " partition " +
+ partitionId);
}
- syncRequired = true;
+ getGraphMapper().getGraphState().getWorkerCommunications().
+ sendPartitionReq(workerPartitionList.getKey(),
+ partition);
+ getPartitionMap().remove(partitionId);
}
}
- // All senders and receivers must agree they are finished
- if (syncRequired) {
- String myVertexRangeExchangePath =
- getVertexRangeExchangePath(getApplicationAttempt(),
- getSuperstep()) +
- "/" + getHostnamePartitionId();
- String vertexRangeExchangeFinishedPath =
- getVertexRangeExchangeFinishedPath(getApplicationAttempt(),
- getSuperstep());
- LOG.info("exchangeVertexRanges: Ready with path " +
- myVertexRangeExchangePath);
- try {
- getZkExt().createExt(myVertexRangeExchangePath,
- null,
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT,
- true);
- LOG.info("exchangeVertexRanges: Waiting on change to " +
- vertexRangeExchangeFinishedPath);
- while (getZkExt().exists(vertexRangeExchangeFinishedPath, true)
- == null) {
- getVertexRangeExchangeFinishedChangedEvent().waitForever();
- getVertexRangeExchangeFinishedChangedEvent().reset();
- }
- } catch (KeeperException e) {
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
+ String myPartitionExchangeDonePath =
+ getPartitionExchangeWorkerPath(
+ getApplicationAttempt(), getSuperstep(), getWorkerInfo());
+ try {
+ getZkExt().createExt(myPartitionExchangeDonePath,
+ null,
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "sendWorkerPartitions: KeeperException to create " +
+ myPartitionExchangeDonePath, e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "sendWorkerPartitions: InterruptedException to create " +
+ myPartitionExchangeDonePath, e);
}
+ if (LOG.isInfoEnabled()) {
+ LOG.info("sendWorkerPartitions: Done sending all my partitions.");
+ }
+ }
- // Add the vertices that were sent earlier.
- Map<I, List<BasicVertex<I, V, E, M>>> inVertexRangeMap =
- getGraphMapper().getWorkerCommunications().getInVertexRangeMap();
- synchronized (inVertexRangeMap) {
- for (Entry<I, List<BasicVertex<I, V, E, M>>> entry :
- inVertexRangeMap.entrySet()) {
- if (entry.getValue() == null || entry.getValue().isEmpty()) {
- continue;
- }
+ @Override
+ public final void exchangeVertexPartitions(
+ Collection<? extends PartitionOwner> masterSetPartitionOwners) {
+ // 1. Fix the addresses of the partition ids if they have changed.
+ // 2. Send all the partitions to their destination workers in a random
+ // fashion.
+ // 3. Notify completion with a ZooKeeper stamp
+ // 4. Wait for all my dependencies to be done (if any)
+ // 5. Add the partitions to myself.
+ PartitionExchange partitionExchange =
+ workerGraphPartitioner.updatePartitionOwners(
+ getWorkerInfo(), masterSetPartitionOwners, getPartitionMap());
+ commService.fixPartitionIdToSocketAddrMap();
+
+ Map<WorkerInfo, List<Integer>> workerPartitionMap =
+ partitionExchange.getSendWorkerPartitionMap();
+ if (!workerPartitionMap.isEmpty()) {
+ sendWorkerPartitions(workerPartitionMap);
+ }
+
+ Set<WorkerInfo> myDependencyWorkerSet =
+ partitionExchange.getMyDependencyWorkerSet();
+ Set<String> workerIdSet = new HashSet<String>();
+ for (WorkerInfo workerInfo : myDependencyWorkerSet) {
+ if (workerIdSet.add(workerInfo.getHostnameId()) != true) {
+ throw new IllegalStateException(
+ "exchangeVertexPartitions: Duplicate entry " + workerInfo);
+ }
+ }
+ if (myDependencyWorkerSet.isEmpty() && workerPartitionMap.isEmpty()) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("exchangeVertexPartitions: Nothing to exchange, " +
+ "exiting early");
+ }
+ return;
+ }
- SortedMap<I, BasicVertex<I, V, E, M>> vertexMap =
- getVertexRangeMap().get(entry.getKey()).getVertexMap();
- if (vertexMap.size() != 0) {
- throw new RuntimeException(
- "exchangeVertexRanges: Failed to import vertex range " +
- entry.getKey() + " of size " + entry.getValue().size() +
- " since it is already of size " + vertexMap.size() +
- " but should be empty!");
+ String vertexExchangePath =
+ getPartitionExchangePath(getApplicationAttempt(), getSuperstep());
+ List<String> workerDoneList;
+ try {
+ while (true) {
+ workerDoneList = getZkExt().getChildrenExt(
+ vertexExchangePath, true, false, false);
+ workerIdSet.removeAll(workerDoneList);
+ if (workerIdSet.isEmpty()) {
+ break;
}
if (LOG.isInfoEnabled()) {
- LOG.info("exchangeVertexRanges: Adding " +
- entry.getValue().size() +
- " vertices for max index " + entry.getKey());
+ LOG.info("exchangeVertexPartitions: Waiting for workers " +
+ workerIdSet);
}
+ getPartitionExchangeChildrenChangedEvent().waitForever();
+ getPartitionExchangeChildrenChangedEvent().reset();
+ }
+ } catch (KeeperException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("exchangeVertexPartitions: Done with exchange.");
+ }
+
+ // Add the partitions sent earlier
+ movePartitionsToWorker(commService);
+ }
+
+ /**
+ * Partitions that are exchanged need to be moved from the communication
+ * service to the worker.
+ *
+ * @param commService Communication service where the partitions are
+ * temporarily stored.
+ */
+ private void movePartitionsToWorker(
+ ServerInterface<I, V, E, M> commService) {
+ Map<Integer, List<BasicVertex<I, V, E, M>>> inPartitionVertexMap =
+ commService.getInPartitionVertexMap();
+ synchronized (inPartitionVertexMap) {
+ for (Entry<Integer, List<BasicVertex<I, V, E, M>>> entry :
+ inPartitionVertexMap.entrySet()) {
+ if (getPartitionMap().containsKey(entry.getKey())) {
+ throw new IllegalStateException(
+ "moveVerticesToWorker: Already has partition " +
+ entry.getKey());
+ }
+
+ Partition<I, V, E, M> tmpPartition = new Partition<I, V, E, M>(
+ getConfiguration(),
+ entry.getKey());
for (BasicVertex<I, V, E, M> vertex : entry.getValue()) {
- if (vertexMap.put(vertex.getVertexId(), vertex) != null) {
+ if (tmpPartition.putVertex(vertex) != null) {
throw new IllegalStateException(
- "exchangeVertexRanges: Vertex " + vertex +
+ "moveVerticesToWorker: Vertex " + vertex +
" already exists!");
}
}
+ if (LOG.isInfoEnabled()) {
+ LOG.info("moveVerticesToWorker: Adding " +
+ entry.getValue().size() +
+ " vertices for partition id " + entry.getKey());
+ }
+ getPartitionMap().put(tmpPartition.getPartitionId(),
+ tmpPartition);
entry.getValue().clear();
}
+ inPartitionVertexMap.clear();
}
}
- @Override
- public NavigableMap<I, VertexRange<I, V, E, M>> getVertexRangeMap() {
- return getVertexRangeMap(getSuperstep());
+ final public BspEvent getPartitionExchangeChildrenChangedEvent() {
+ return partitionExchangeChildrenChanged;
}
@Override
@@ -1423,8 +1292,53 @@ public class BspServiceWorker<
"processEvent: Couldn't properly get job state from " +
jsonObj.toString());
}
- return true;
+ foundEvent = true;
+ } else if (event.getPath().contains(PARTITION_EXCHANGE_DIR) &&
+ event.getType() == EventType.NodeChildrenChanged) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("processEvent : partitionExchangeChildrenChanged " +
+ "(at least one worker is done sending partitions)");
+ }
+ partitionExchangeChildrenChanged.signal();
+ foundEvent = true;
}
+
return foundEvent;
}
+
+ @Override
+ public WorkerInfo getWorkerInfo() {
+ return workerInfo;
+ }
+
+ @Override
+ public Map<Integer, Partition<I, V, E, M>> getPartitionMap() {
+ return workerPartitionMap;
+ }
+
+ @Override
+ public Collection<? extends PartitionOwner> getPartitionOwners() {
+ return workerGraphPartitioner.getPartitionOwners();
+ }
+
+ @Override
+ public PartitionOwner getVertexPartitionOwner(I vertexIndex) {
+ return workerGraphPartitioner.getPartitionOwner(vertexIndex);
+ }
+
+ public Partition<I, V, E, M> getPartition(I vertexIndex) {
+ PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
+ return workerPartitionMap.get(partitionOwner.getPartitionId());
+ }
+
+ @Override
+ public BasicVertex<I, V, E, M> getVertex(I vertexIndex) {
+ PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
+ if (workerPartitionMap.containsKey(partitionOwner.getPartitionId())) {
+ return workerPartitionMap.get(
+ partitionOwner.getPartitionId()).getVertex(vertexIndex);
+ } else {
+ return null;
+ }
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java Tue Nov 15 00:54:20 2011
@@ -18,6 +18,9 @@
package org.apache.giraph.graph;
+import org.apache.giraph.graph.partition.GraphPartitionerFactory;
+import org.apache.giraph.graph.partition.HashPartitionerFactory;
+import org.apache.giraph.graph.partition.PartitionStats;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -29,6 +32,56 @@ import org.apache.hadoop.util.Reflection
*/
public class BspUtils {
/**
+ * Get the user's subclassed {@link GraphPartitionerFactory}.
+ *
+ * @param conf Configuration to check
+ * @return User's graph partitioner
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable>
+ Class<? extends GraphPartitionerFactory<I, V, E, M>>
+ getGraphPartitionerClass(Configuration conf) {
+ return (Class<? extends GraphPartitionerFactory<I, V, E, M>>)
+ conf.getClass(GiraphJob.GRAPH_PARTITIONER_FACTORY_CLASS,
+ HashPartitionerFactory.class,
+ GraphPartitionerFactory.class);
+ }
+
+ /**
+ * Create a user graph partitioner class
+ *
+ * @param conf Configuration to check
+ * @return Instantiated user graph partitioner class
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable>
+ GraphPartitionerFactory<I, V, E, M>
+ createGraphPartitioner(Configuration conf) {
+ Class<? extends GraphPartitionerFactory<I, V, E, M>>
+ graphPartitionerFactoryClass =
+ getGraphPartitionerClass(conf);
+ return ReflectionUtils.newInstance(graphPartitionerFactoryClass, conf);
+ }
+
+ /**
+ * Create a user graph partitioner partition stats class
+ *
+ * @param conf Configuration to check
+ * @return Instantiated user graph partition stats class
+ */
+ @SuppressWarnings("rawtypes")
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable>
+ PartitionStats createGraphPartitionStats(Configuration conf) {
+ GraphPartitionerFactory<I, V, E, M> graphPartitioner =
+ createGraphPartitioner(conf);
+ return graphPartitioner.createMasterGraphPartitioner().
+ createPartitionStats();
+ }
+
+ /**
* Get the user's subclassed {@link VertexInputFormat}.
*
* @param conf Configuration to check
@@ -131,41 +184,6 @@ public class BspUtils {
}
/**
- * Get the user's subclassed vertex range balancer
- *
- * @param conf Configuration to check
- * @return User's vertex range balancer class
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public static <I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable>
- Class<? extends VertexRangeBalancer<I, V, E, M>>
- getVertexRangeBalancerClass(Configuration conf) {
- return (Class<? extends VertexRangeBalancer<I, V, E, M>>)
- conf.getClass(GiraphJob.VERTEX_RANGE_BALANCER_CLASS,
- StaticBalancer.class,
- BasicVertexRangeBalancer.class);
- }
-
- /**
- * Create a user vertex range balancer class
- *
- * @param conf Configuration to check
- * @return Instantiated user vertex input format class
- */
- @SuppressWarnings("rawtypes")
- public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable>
- VertexRangeBalancer<I, V, E, M>
- createVertexRangeBalancer(Configuration conf) {
- Class<? extends VertexRangeBalancer<I, V, E, M>>
- vertexRangeBalancerClass = getVertexRangeBalancerClass(conf);
- return ReflectionUtils.newInstance(vertexRangeBalancerClass, conf);
- }
-
- /**
* Get the user's subclassed VertexResolver.
*
* @param conf Configuration to check
@@ -202,7 +220,7 @@ public class BspUtils {
resolver.setGraphState(graphState);
return resolver;
}
-
+
/**
* Get the user's subclassed WorkerContext.
*
@@ -216,7 +234,7 @@ public class BspUtils {
DefaultWorkerContext.class,
WorkerContext.class);
}
-
+
/**
* Create a user worker context
*
@@ -224,12 +242,12 @@ public class BspUtils {
* @return Instantiated user worker context
*/
@SuppressWarnings("rawtypes")
- public static <I extends WritableComparable,
+ public static <I extends WritableComparable,
V extends Writable,
- E extends Writable,
- M extends Writable>
+ E extends Writable,
+ M extends Writable>
WorkerContext createWorkerContext(Configuration conf,
- GraphState<I, V, E, M> graphState) {
+ GraphState<I, V, E, M> graphState) {
Class<? extends WorkerContext> workerContextClass =
getWorkerContextClass(conf);
WorkerContext workerContext =
@@ -237,7 +255,7 @@ public class BspUtils {
workerContext.setGraphState(graphState);
return workerContext;
}
-
+
/**
* Get the user's subclassed Vertex.
@@ -268,8 +286,10 @@ public class BspUtils {
public static <I extends WritableComparable, V extends Writable,
E extends Writable, M extends Writable> BasicVertex<I, V, E, M>
createVertex(Configuration conf) {
- Class<? extends BasicVertex<I, V, E, M>> vertexClass = getVertexClass(conf);
- BasicVertex<I, V, E, M> vertex = ReflectionUtils.newInstance(vertexClass, conf);
+ Class<? extends BasicVertex<I, V, E, M>> vertexClass =
+ getVertexClass(conf);
+ BasicVertex<I, V, E, M> vertex =
+ ReflectionUtils.newInstance(vertexClass, conf);
return vertex;
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Tue Nov 15 00:54:20 2011
@@ -20,6 +20,7 @@ package org.apache.giraph.graph;
import org.apache.giraph.bsp.BspInputFormat;
import org.apache.giraph.bsp.BspOutputFormat;
+import org.apache.giraph.graph.partition.GraphPartitionerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.Logger;
@@ -43,12 +44,12 @@ public class GiraphJob extends Job {
/** Vertex combiner class - optional */
public static final String VERTEX_COMBINER_CLASS =
"giraph.combinerClass";
- /** Vertex range balancer class - optional */
- public static final String VERTEX_RANGE_BALANCER_CLASS =
- "giraph.vertexRangeBalancerClass";
/** Vertex resolver class - optional */
public static final String VERTEX_RESOLVER_CLASS =
"giraph.vertexResolverClass";
+ /** Graph partitioner factory class - optional */
+ public static final String GRAPH_PARTITIONER_FACTORY_CLASS =
+ "giraph.graphPartitionerFactoryClass";
/** Vertex index class */
public static final String VERTEX_INDEX_CLASS = "giraph.vertexIndexClass";
@@ -143,11 +144,20 @@ public class GiraphJob extends Job {
/** Default maximum number of RPC handlers */
public static final int RPC_NUM_HANDLERS_DEFAULT = 100;
+ /**
+ * Maximum number of vertices per partition before sending.
+ * (input superstep only).
+ */
+ public static final String MAX_VERTICES_PER_PARTITION =
+ "giraph.maxVerticesPerPartition";
+ /** Default maximum number of vertices per partition before sending. */
+ public static final int MAX_VERTICES_PER_PARTITION_DEFAULT = 100000;
+
/** Maximum number of messages per peer before flush */
public static final String MSG_SIZE = "giraph.msgSize";
/** Default maximum number of messages per peer before flush */
public static final int MSG_SIZE_DEFAULT = 1000;
-
+
/** Number of flush threads per peer */
public static final String MSG_NUM_FLUSH_THREADS = "giraph.msgNumFlushThreads";
@@ -191,7 +201,7 @@ public class GiraphJob extends Job {
"giraph.zkJavaOpts";
/** Default java opts passed to ZooKeeper startup */
public static final String ZOOKEEPER_JAVA_OPTS_DEFAULT =
- "-Xmx256m -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC " +
+ "-Xmx512m -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC " +
"-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxGCPauseMillis=100";
/**
@@ -372,16 +382,15 @@ public class GiraphJob extends Job {
}
/**
- * Set the vertex range balancer class (optional)
+ * Set the graph partitioner class (optional)
*
- * @param vertexRangeBalancerClass Determines how vertex
- * ranges are balanced prior to each superstep
+ * @param graphPartitionerClass Determines how the graph is partitioned
*/
- final public void setVertexRangeBalancerClass(
- Class<?> vertexRangeBalancerClass) {
- getConfiguration().setClass(VERTEX_RANGE_BALANCER_CLASS,
- vertexRangeBalancerClass,
- VertexRangeBalancer.class);
+ final public void setGraphPartitionerFactoryClass(
+ Class<?> graphPartitionerFactoryClass) {
+ getConfiguration().setClass(GRAPH_PARTITIONER_FACTORY_CLASS,
+ graphPartitionerFactoryClass,
+ GraphPartitionerFactory.class);
}
/**
@@ -394,7 +403,7 @@ public class GiraphJob extends Job {
vertexResolverClass,
VertexResolver.class);
}
-
+
/**
* Set the worker context class (optional)
*
@@ -406,7 +415,7 @@ public class GiraphJob extends Job {
workerContextClass,
WorkerContext.class);
}
-
+
/**
* Set worker configuration for determining what is required for
* a superstep.
@@ -499,6 +508,9 @@ public class GiraphJob extends Job {
setIntConfIfDefault("mapred.job.map.memory.mb", 1024);
setIntConfIfDefault("mapred.job.reduce.memory.mb", 1024);
+ // Speculative execution doesn't make sense for Giraph
+ conf.setBoolean("mapred.map.tasks.speculative.execution", false);
+
if (getJar() == null) {
setJarByClass(GiraphJob.class);
}
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java?rev=1201987&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java Tue Nov 15 00:54:20 2011
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Aggregated stats by the master.
+ */
+public class GlobalStats implements Writable {
+ private long vertexCount = 0;
+ private long finishedVertexCount = 0;
+ private long edgeCount = 0;
+ private long messageCount = 0;
+
+ public void addPartitionStats(PartitionStats partitionStats) {
+ this.vertexCount += partitionStats.getVertexCount();
+ this.finishedVertexCount += partitionStats.getFinishedVertexCount();
+ this.edgeCount += partitionStats.getEdgeCount();
+ }
+
+ public long getVertexCount() {
+ return vertexCount;
+ }
+
+ public long getFinishedVertexCount() {
+ return finishedVertexCount;
+ }
+
+ public long getEdgeCount() {
+ return edgeCount;
+ }
+
+ public long getMessageCount() {
+ return messageCount;
+ }
+
+ public void addMessageCount(long messageCount) {
+ this.messageCount += messageCount;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ vertexCount = input.readLong();
+ finishedVertexCount = input.readLong();
+ edgeCount = input.readLong();
+ messageCount = input.readLong();
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.writeLong(vertexCount);
+ output.writeLong(finishedVertexCount);
+ output.writeLong(edgeCount);
+ output.writeLong(messageCount);
+ }
+
+ @Override
+ public String toString() {
+ return "(vtx=" + vertexCount + ",finVtx=" +
+ finishedVertexCount + ",edges=" + edgeCount + ",msgCount=" +
+ messageCount + ")";
+ }
+}