You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/08/14 22:57:08 UTC

svn commit: r1373091 - in /giraph/trunk: CHANGELOG src/main/java/org/apache/giraph/graph/BspService.java src/main/java/org/apache/giraph/graph/BspServiceMaster.java src/main/java/org/apache/giraph/graph/BspServiceWorker.java

Author: aching
Date: Tue Aug 14 20:57:07 2012
New Revision: 1373091

URL: http://svn.apache.org/viewvc?rev=1373091&view=rev
Log:
GIRAPH-297: Checkpointing on master is done one superstep later
(majakabiljo via aching).

Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1373091&r1=1373090&r2=1373091&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Aug 14 20:57:07 2012
@@ -2,19 +2,24 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
-  GIRAPH-275: Restore data locality to workers reading InputSplits where possible
-  without querying NameNode, ZooKeeper. (Eli Reisman via jghoman)
+  GIRAPH-297: Checkpointing on master is done one superstep later
+  (majakabiljo via aching).
+
+  GIRAPH-275: Restore data locality to workers reading InputSplits
+  where possible without querying NameNode, ZooKeeper. (Eli Reisman
+  via jghoman)
   
   GIRAPH-258: Check type compatibility before submitting job. 
   (Eli Reisman via jghoman)
 
-  GIRAPH-218: Consolidate all I/O Format classes under one roof in lib/ directory.
-  (Eli Reisman via jghoman)
+  GIRAPH-218: Consolidate all I/O Format classes under one roof in
+  lib/ directory.  (Eli Reisman via jghoman)
 
   GIRAPH-259: TestBspBasic.testBspPageRank is broken (majakabiljo via apresta)
 
-  GIRAPH-256: Partitioning outgoing graph data during INPUT_SUPERSTEP by # of 
-  vertices results in wide variance in RPC message sizes. (Eli Reisman via jghoman)
+  GIRAPH-256: Partitioning outgoing graph data during INPUT_SUPERSTEP
+  by # of vertices results in wide variance in RPC message sizes. (Eli
+  Reisman via jghoman)
   
   GIRAPH-290: Add committer information for Alessandro Presta to pom.xml
   (apresta)

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1373091&r1=1373090&r2=1373091&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Tue Aug 14 20:57:07 2012
@@ -103,6 +103,9 @@ public abstract class BspService<I exten
   public static final String WORKER_HEALTHY_DIR = "/_workerHealthyDir";
   /** Unhealthy workers register here. */
   public static final String WORKER_UNHEALTHY_DIR = "/_workerUnhealthyDir";
+  /** Workers which wrote checkpoint notify here */
+  public static final String WORKER_WROTE_CHECKPOINT_DIR =
+      "/_workerWroteCheckpointDir";
   /** Finished workers notify here */
   public static final String WORKER_FINISHED_DIR = "/_workerFinishedDir";
   /** Where the partition assignments are set */
@@ -435,6 +438,20 @@ public abstract class BspService<I exten
   }
 
   /**
+   * Generate the worker "wrote checkpoint" directory path for a
+   * superstep
+   *
+   * @param attempt application attempt number
+   * @param superstep superstep to use
+   * @return directory path based on the a superstep
+   */
+  public final String getWorkerWroteCheckpointPath(long attempt,
+      long superstep) {
+    return applicationAttemptsPath + "/" + attempt +
+        SUPERSTEP_DIR + "/" + superstep + WORKER_WROTE_CHECKPOINT_DIR;
+  }
+
+  /**
    * Generate the worker "finished" directory path for a
    * superstep
    *

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1373091&r1=1373090&r2=1373091&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Tue Aug 14 20:57:07 2012
@@ -123,6 +123,8 @@ public class BspServiceMaster<I extends 
   private final int partitionLongTailMinPrint;
   /** Last finalized checkpoint */
   private long lastCheckpointedSuperstep = -1;
+  /** Worker wrote checkpoint */
+  private final BspEvent workerWroteCheckpoint;
   /** State of the superstep changed */
   private final BspEvent superstepStateChanged;
   /** Master graph partitioner */
@@ -151,6 +153,8 @@ public class BspServiceMaster<I extends 
       Mapper<?, ?, ?, ?>.Context context,
       GraphMapper<I, V, E, M> graphMapper) {
     super(serverPortList, sessionMsecTimeout, context, graphMapper);
+    workerWroteCheckpoint = new PredicateLock(context);
+    registerBspEvent(workerWroteCheckpoint);
     superstepStateChanged = new PredicateLock(context);
     registerBspEvent(superstepStateChanged);
 
@@ -1398,6 +1402,54 @@ public class BspServiceMaster<I extends 
         chosenWorkerInfoList,
         masterGraphPartitioner);
 
+    // Finalize the valid checkpoint file prefixes and possibly
+    // the aggregators.
+    if (checkpointFrequencyMet(getSuperstep())) {
+      String workerWroteCheckpointPath =
+          getWorkerWroteCheckpointPath(getApplicationAttempt(),
+              getSuperstep());
+      // first wait for all the workers to write their checkpoint data
+      if (!barrierOnWorkerList(workerWroteCheckpointPath,
+          chosenWorkerInfoList,
+          getWorkerWroteCheckpointEvent())) {
+        return SuperstepState.WORKER_FAILURE;
+      }
+      try {
+        finalizeCheckpoint(getSuperstep(), chosenWorkerInfoList);
+      } catch (IOException e) {
+        throw new IllegalStateException(
+            "coordinateSuperstep: IOException on finalizing checkpoint",
+            e);
+      }
+    }
+
+    // Clean up the old supersteps (always keep this one)
+    long removeableSuperstep = getSuperstep() - 1;
+    if (!(getConfiguration().getBoolean(
+        GiraphJob.KEEP_ZOOKEEPER_DATA,
+        GiraphJob.KEEP_ZOOKEEPER_DATA_DEFAULT)) &&
+        (removeableSuperstep >= 0)) {
+      String oldSuperstepPath =
+          getSuperstepPath(getApplicationAttempt()) + "/" +
+              removeableSuperstep;
+      try {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("coordinateSuperstep: Cleaning up old Superstep " +
+              oldSuperstepPath);
+        }
+        getZkExt().deleteExt(oldSuperstepPath,
+            -1,
+            true);
+      } catch (KeeperException.NoNodeException e) {
+        LOG.warn("coordinateBarrier: Already cleaned up " +
+            oldSuperstepPath);
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "coordinateSuperstep: KeeperException on " +
+                "finalizing checkpoint", e);
+      }
+    }
+
     if (getSuperstep() == INPUT_SUPERSTEP) {
       // Coordinate the workers finishing sending their vertices to the
       // correct workers and signal when everything is done.
@@ -1468,44 +1520,6 @@ public class BspServiceMaster<I extends 
         globalStats.getMessageCount() -
         sentMessagesCounter.getValue());
 
-    // Finalize the valid checkpoint file prefixes and possibly
-    // the aggregators.
-    if (checkpointFrequencyMet(getSuperstep())) {
-      try {
-        finalizeCheckpoint(getSuperstep(), chosenWorkerInfoList);
-      } catch (IOException e) {
-        throw new IllegalStateException(
-            "coordinateSuperstep: IOException on finalizing checkpoint",
-            e);
-      }
-    }
-
-    // Clean up the old supersteps (always keep this one)
-    long removeableSuperstep = getSuperstep() - 1;
-    if (!(getConfiguration().getBoolean(
-        GiraphJob.KEEP_ZOOKEEPER_DATA,
-        GiraphJob.KEEP_ZOOKEEPER_DATA_DEFAULT)) &&
-        (removeableSuperstep >= 0)) {
-      String oldSuperstepPath =
-          getSuperstepPath(getApplicationAttempt()) + "/" +
-              removeableSuperstep;
-      try {
-        if (LOG.isInfoEnabled()) {
-          LOG.info("coordinateSuperstep: Cleaning up old Superstep " +
-              oldSuperstepPath);
-        }
-        getZkExt().deleteExt(oldSuperstepPath,
-            -1,
-            true);
-      } catch (KeeperException.NoNodeException e) {
-        LOG.warn("coordinateBarrier: Already cleaned up " +
-            oldSuperstepPath);
-      } catch (KeeperException e) {
-        throw new IllegalStateException(
-            "coordinateSuperstep: KeeperException on " +
-                "finalizing checkpoint", e);
-      }
-    }
     incrCachedSuperstep();
     // Counter starts at zero, so no need to increment
     if (getSuperstep() > 0) {
@@ -1718,6 +1732,15 @@ public class BspServiceMaster<I extends 
   }
 
   /**
+   * Event that the master watches that denotes when a worker wrote checkpoint
+   *
+   * @return Event that denotes when a worker wrote checkpoint
+   */
+  public final BspEvent getWorkerWroteCheckpointEvent() {
+    return workerWroteCheckpoint;
+  }
+
+  /**
    * Event that the master watches that denotes if a worker has done something
    * that changes the state of a superstep (either a worker completed or died)
    *
@@ -1779,6 +1802,14 @@ public class BspServiceMaster<I extends 
       }
       superstepStateChanged.signal();
       foundEvent = true;
+    } else if (event.getPath().contains(WORKER_WROTE_CHECKPOINT_DIR) &&
+        event.getType() == EventType.NodeChildrenChanged) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("processEvent: Worker wrote checkpoint (node change) " +
+            "event - workerWroteCheckpoint signaled");
+      }
+      workerWroteCheckpoint.signal();
+      foundEvent = true;
     }
 
     return foundEvent;

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1373091&r1=1373090&r2=1373091&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Tue Aug 14 20:57:07 2012
@@ -1212,6 +1212,27 @@ public class BspServiceWorker<I extends 
     }
 
     getFs().createNewFile(validFilePath);
+
+    // Notify master that checkpoint is stored
+    String workerWroteCheckpoint =
+        getWorkerWroteCheckpointPath(getApplicationAttempt(),
+            getSuperstep()) + "/" + getHostnamePartitionId();
+    try {
+      getZkExt().createExt(workerWroteCheckpoint,
+          new byte[0],
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      LOG.warn("finishSuperstep: wrote checkpoint worker path " +
+          workerWroteCheckpoint + " already exists!");
+    } catch (KeeperException e) {
+      throw new IllegalStateException("Creating " + workerWroteCheckpoint +
+          " failed with KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Creating " + workerWroteCheckpoint +
+          " failed with InterruptedException", e);
+    }
   }
 
   @Override