You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/08/14 22:57:08 UTC
svn commit: r1373091 - in /giraph/trunk: CHANGELOG
src/main/java/org/apache/giraph/graph/BspService.java
src/main/java/org/apache/giraph/graph/BspServiceMaster.java
src/main/java/org/apache/giraph/graph/BspServiceWorker.java
Author: aching
Date: Tue Aug 14 20:57:07 2012
New Revision: 1373091
URL: http://svn.apache.org/viewvc?rev=1373091&view=rev
Log:
GIRAPH-297: Checkpointing on master is done one superstep later
(majakabiljo via aching).
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1373091&r1=1373090&r2=1373091&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Aug 14 20:57:07 2012
@@ -2,19 +2,24 @@ Giraph Change Log
Release 0.2.0 - unreleased
- GIRAPH-275: Restore data locality to workers reading InputSplits where possible
- without querying NameNode, ZooKeeper. (Eli Reisman via jghoman)
+ GIRAPH-297: Checkpointing on master is done one superstep later
+ (majakabiljo via aching).
+
+ GIRAPH-275: Restore data locality to workers reading InputSplits
+ where possible without querying NameNode, ZooKeeper. (Eli Reisman
+ via jghoman)
GIRAPH-258: Check type compatibility before submitting job.
(Eli Reisman via jghoman)
- GIRAPH-218: Consolidate all I/O Format classes under one roof in lib/ directory.
- (Eli Reisman via jghoman)
+ GIRAPH-218: Consolidate all I/O Format classes under one roof in
+ lib/ directory. (Eli Reisman via jghoman)
GIRAPH-259: TestBspBasic.testBspPageRank is broken (majakabiljo via apresta)
- GIRAPH-256: Partitioning outgoing graph data during INPUT_SUPERSTEP by # of
- vertices results in wide variance in RPC message sizes. (Eli Reisman via jghoman)
+ GIRAPH-256: Partitioning outgoing graph data during INPUT_SUPERSTEP
+ by # of vertices results in wide variance in RPC message sizes. (Eli
+ Reisman via jghoman)
GIRAPH-290: Add committer information for Alessandro Presta to pom.xml
(apresta)
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1373091&r1=1373090&r2=1373091&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Tue Aug 14 20:57:07 2012
@@ -103,6 +103,9 @@ public abstract class BspService<I exten
public static final String WORKER_HEALTHY_DIR = "/_workerHealthyDir";
/** Unhealthy workers register here. */
public static final String WORKER_UNHEALTHY_DIR = "/_workerUnhealthyDir";
+ /** Workers which wrote checkpoint notify here */
+ public static final String WORKER_WROTE_CHECKPOINT_DIR =
+ "/_workerWroteCheckpointDir";
/** Finished workers notify here */
public static final String WORKER_FINISHED_DIR = "/_workerFinishedDir";
/** Where the partition assignments are set */
@@ -435,6 +438,20 @@ public abstract class BspService<I exten
}
/**
+ * Generate the worker "wrote checkpoint" directory path for a
+ * superstep
+ *
+ * @param attempt application attempt number
+ * @param superstep superstep to use
+ * @return directory path based on the a superstep
+ */
+ public final String getWorkerWroteCheckpointPath(long attempt,
+ long superstep) {
+ return applicationAttemptsPath + "/" + attempt +
+ SUPERSTEP_DIR + "/" + superstep + WORKER_WROTE_CHECKPOINT_DIR;
+ }
+
+ /**
* Generate the worker "finished" directory path for a
* superstep
*
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1373091&r1=1373090&r2=1373091&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Tue Aug 14 20:57:07 2012
@@ -123,6 +123,8 @@ public class BspServiceMaster<I extends
private final int partitionLongTailMinPrint;
/** Last finalized checkpoint */
private long lastCheckpointedSuperstep = -1;
+ /** Worker wrote checkpoint */
+ private final BspEvent workerWroteCheckpoint;
/** State of the superstep changed */
private final BspEvent superstepStateChanged;
/** Master graph partitioner */
@@ -151,6 +153,8 @@ public class BspServiceMaster<I extends
Mapper<?, ?, ?, ?>.Context context,
GraphMapper<I, V, E, M> graphMapper) {
super(serverPortList, sessionMsecTimeout, context, graphMapper);
+ workerWroteCheckpoint = new PredicateLock(context);
+ registerBspEvent(workerWroteCheckpoint);
superstepStateChanged = new PredicateLock(context);
registerBspEvent(superstepStateChanged);
@@ -1398,6 +1402,54 @@ public class BspServiceMaster<I extends
chosenWorkerInfoList,
masterGraphPartitioner);
+ // Finalize the valid checkpoint file prefixes and possibly
+ // the aggregators.
+ if (checkpointFrequencyMet(getSuperstep())) {
+ String workerWroteCheckpointPath =
+ getWorkerWroteCheckpointPath(getApplicationAttempt(),
+ getSuperstep());
+ // first wait for all the workers to write their checkpoint data
+ if (!barrierOnWorkerList(workerWroteCheckpointPath,
+ chosenWorkerInfoList,
+ getWorkerWroteCheckpointEvent())) {
+ return SuperstepState.WORKER_FAILURE;
+ }
+ try {
+ finalizeCheckpoint(getSuperstep(), chosenWorkerInfoList);
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "coordinateSuperstep: IOException on finalizing checkpoint",
+ e);
+ }
+ }
+
+ // Clean up the old supersteps (always keep this one)
+ long removeableSuperstep = getSuperstep() - 1;
+ if (!(getConfiguration().getBoolean(
+ GiraphJob.KEEP_ZOOKEEPER_DATA,
+ GiraphJob.KEEP_ZOOKEEPER_DATA_DEFAULT)) &&
+ (removeableSuperstep >= 0)) {
+ String oldSuperstepPath =
+ getSuperstepPath(getApplicationAttempt()) + "/" +
+ removeableSuperstep;
+ try {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("coordinateSuperstep: Cleaning up old Superstep " +
+ oldSuperstepPath);
+ }
+ getZkExt().deleteExt(oldSuperstepPath,
+ -1,
+ true);
+ } catch (KeeperException.NoNodeException e) {
+ LOG.warn("coordinateBarrier: Already cleaned up " +
+ oldSuperstepPath);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "coordinateSuperstep: KeeperException on " +
+ "finalizing checkpoint", e);
+ }
+ }
+
if (getSuperstep() == INPUT_SUPERSTEP) {
// Coordinate the workers finishing sending their vertices to the
// correct workers and signal when everything is done.
@@ -1468,44 +1520,6 @@ public class BspServiceMaster<I extends
globalStats.getMessageCount() -
sentMessagesCounter.getValue());
- // Finalize the valid checkpoint file prefixes and possibly
- // the aggregators.
- if (checkpointFrequencyMet(getSuperstep())) {
- try {
- finalizeCheckpoint(getSuperstep(), chosenWorkerInfoList);
- } catch (IOException e) {
- throw new IllegalStateException(
- "coordinateSuperstep: IOException on finalizing checkpoint",
- e);
- }
- }
-
- // Clean up the old supersteps (always keep this one)
- long removeableSuperstep = getSuperstep() - 1;
- if (!(getConfiguration().getBoolean(
- GiraphJob.KEEP_ZOOKEEPER_DATA,
- GiraphJob.KEEP_ZOOKEEPER_DATA_DEFAULT)) &&
- (removeableSuperstep >= 0)) {
- String oldSuperstepPath =
- getSuperstepPath(getApplicationAttempt()) + "/" +
- removeableSuperstep;
- try {
- if (LOG.isInfoEnabled()) {
- LOG.info("coordinateSuperstep: Cleaning up old Superstep " +
- oldSuperstepPath);
- }
- getZkExt().deleteExt(oldSuperstepPath,
- -1,
- true);
- } catch (KeeperException.NoNodeException e) {
- LOG.warn("coordinateBarrier: Already cleaned up " +
- oldSuperstepPath);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "coordinateSuperstep: KeeperException on " +
- "finalizing checkpoint", e);
- }
- }
incrCachedSuperstep();
// Counter starts at zero, so no need to increment
if (getSuperstep() > 0) {
@@ -1718,6 +1732,15 @@ public class BspServiceMaster<I extends
}
/**
+ * Event that the master watches that denotes when a worker wrote checkpoint
+ *
+ * @return Event that denotes when a worker wrote checkpoint
+ */
+ public final BspEvent getWorkerWroteCheckpointEvent() {
+ return workerWroteCheckpoint;
+ }
+
+ /**
* Event that the master watches that denotes if a worker has done something
* that changes the state of a superstep (either a worker completed or died)
*
@@ -1779,6 +1802,14 @@ public class BspServiceMaster<I extends
}
superstepStateChanged.signal();
foundEvent = true;
+ } else if (event.getPath().contains(WORKER_WROTE_CHECKPOINT_DIR) &&
+ event.getType() == EventType.NodeChildrenChanged) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("processEvent: Worker wrote checkpoint (node change) " +
+ "event - workerWroteCheckpoint signaled");
+ }
+ workerWroteCheckpoint.signal();
+ foundEvent = true;
}
return foundEvent;
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1373091&r1=1373090&r2=1373091&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Tue Aug 14 20:57:07 2012
@@ -1212,6 +1212,27 @@ public class BspServiceWorker<I extends
}
getFs().createNewFile(validFilePath);
+
+ // Notify master that checkpoint is stored
+ String workerWroteCheckpoint =
+ getWorkerWroteCheckpointPath(getApplicationAttempt(),
+ getSuperstep()) + "/" + getHostnamePartitionId();
+ try {
+ getZkExt().createExt(workerWroteCheckpoint,
+ new byte[0],
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+ } catch (KeeperException.NodeExistsException e) {
+ LOG.warn("finishSuperstep: wrote checkpoint worker path " +
+ workerWroteCheckpoint + " already exists!");
+ } catch (KeeperException e) {
+ throw new IllegalStateException("Creating " + workerWroteCheckpoint +
+ " failed with KeeperException", e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Creating " + workerWroteCheckpoint +
+ " failed with InterruptedException", e);
+ }
}
@Override