You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2015/05/26 22:05:48 UTC

git commit: updated refs/heads/trunk to c8078a2

Repository: giraph
Updated Branches:
  refs/heads/trunk 4b743f163 -> c8078a2f3


Gets rid of spammy ZooKeeper messages regarding 'lost reservation' at the end of the workers' log

Summary:
Once a Giraph job with more than 1 worker completes running, ZooKeeper usually causes a lot of
unnecessary messages mistakenly informing 'lost reservation' of input splits. This is happening
because one worker closes EPHEMERAL ZooKeeper node and this causes an event on other workers about
each existing input split. Since there is one instance of EPHEMERAL ZooKeeper, once it is closed
it translate as deletion/lost of input splits in other workers. These events are watched by ZooKeeperExt
in other workers causing 'lost reservation' messages, one for each input split, at the end of the worker
log.

A fix to this problem is to avoid watching aforementioned events once computation passes the INPUT_SUPERSTEP.

[JIRA number is GIRAPH-1009]

Test Plan:
Running a Giraph job on more than 1 worker without this patch causes the 'lost reservation' messages at the end
of worker log. Applying this patch eliminates these messages.

'mvn clean verify' returns successfully with this patch.

Reviewers: dionysis.logothetis, avery.ching

Reviewed By: avery.ching

Differential Revision: https://reviews.facebook.net/D38949


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/c8078a2f
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/c8078a2f
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/c8078a2f

Branch: refs/heads/trunk
Commit: c8078a2f3348f53d2abd276977323aff74c34f1c
Parents: 4b743f1
Author: Hassan Eslami <he...@heslami-mbp.dhcp.thefacebook.com>
Authored: Tue May 26 12:56:20 2015 -0700
Committer: Avery Ching <ac...@fb.com>
Committed: Tue May 26 13:05:39 2015 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |  3 +++
 .../apache/giraph/worker/BspServiceWorker.java  | 22 ++++++++++++++++----
 .../giraph/worker/InputSplitsHandler.java       | 17 +++++++++++----
 3 files changed, 34 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/c8078a2f/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index bb5e7e8..8836f3c 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 1.2.0 - unreleased
+  GIRAPH-1009: Spammy 'lost reservation' messages from ZooKeeper in workers' log at the end of
+  the computation. (heslami via aching)
+
   GIRAPH-1008: Create Computation per thread instead of per partition (majakabiljo)
 
   GIRAPH-1004: Allow changing hadoop output format (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/c8078a2f/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 2f1c2ef..5ec1872 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -180,6 +180,11 @@ public class BspServiceWorker<I extends WritableComparable,
   /** Time spent waiting on requests to finish */
   private GiraphTimer waitRequestsTimer;
 
+  /** InputSplit handlers used in INPUT_SUPERSTEP for vertex splits */
+  private InputSplitsHandler vertexSplitsHandler;
+  /** InputSplit handlers used in INPUT_SUPERSTEP for edge splits */
+  private InputSplitsHandler edgeSplitsHandler;
+
   /**
    * Constructor for setting up the worker.
    *
@@ -235,6 +240,8 @@ public class BspServiceWorker<I extends WritableComparable,
         null;
 
     GiraphMetrics.get().addSuperstepResetObserver(this);
+    vertexSplitsHandler = null;
+    edgeSplitsHandler = null;
   }
 
   @Override
@@ -390,7 +397,7 @@ public class BspServiceWorker<I extends WritableComparable,
         new InputSplitPathOrganizer(getZkExt(),
             inputSplitPathList, getWorkerInfo().getHostname(),
             getConfiguration().useInputSplitLocality());
-    InputSplitsHandler splitsHandler = new InputSplitsHandler(
+    vertexSplitsHandler = new InputSplitsHandler(
         splitOrganizer,
         getZkExt(),
         getContext(),
@@ -403,7 +410,7 @@ public class BspServiceWorker<I extends WritableComparable,
             getContext(),
             getConfiguration(),
             this,
-            splitsHandler,
+            vertexSplitsHandler,
             getZkExt());
 
     return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory);
@@ -424,7 +431,7 @@ public class BspServiceWorker<I extends WritableComparable,
         new InputSplitPathOrganizer(getZkExt(),
             inputSplitPathList, getWorkerInfo().getHostname(),
             getConfiguration().useInputSplitLocality());
-    InputSplitsHandler splitsHandler = new InputSplitsHandler(
+    edgeSplitsHandler = new InputSplitsHandler(
         splitOrganizer,
         getZkExt(),
         getContext(),
@@ -437,7 +444,7 @@ public class BspServiceWorker<I extends WritableComparable,
             getContext(),
             getConfiguration(),
             this,
-            splitsHandler,
+            edgeSplitsHandler,
             getZkExt());
 
     return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory).
@@ -895,6 +902,13 @@ public class BspServiceWorker<I extends WritableComparable,
 
     if (getSuperstep() != INPUT_SUPERSTEP) {
       postSuperstepCallbacks();
+    } else {
+      if (getConfiguration().hasVertexInputFormat()) {
+        vertexSplitsHandler.setDoneReadingGraph(true);
+      }
+      if (getConfiguration().hasEdgeInputFormat()) {
+        edgeSplitsHandler.setDoneReadingGraph(true);
+      }
     }
 
     globalCommHandler.finishSuperstep(workerAggregatorRequestProcessor);

http://git-wip-us.apache.org/repos/asf/giraph/blob/c8078a2f/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java
index f7d11a3..e2099eb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsHandler.java
@@ -54,6 +54,9 @@ public class InputSplitsHandler implements Watcher  {
   private final String inputSplitReservedNode;
   /** ZooKeeper input split finished node. */
   private final String inputSplitFinishedNode;
+  /** Specifies if we finished execution of INPUT_SUPERSTEP. The variable may
+   * be accessed via different threads. */
+  private volatile boolean doneReadingGraph;
 
   /**
    * Constructor
@@ -73,10 +76,14 @@ public class InputSplitsHandler implements Watcher  {
     this.context = context;
     this.inputSplitReservedNode = inputSplitReservedNode;
     this.inputSplitFinishedNode = inputSplitFinishedNode;
+    this.doneReadingGraph = false;
   }
 
+  public void setDoneReadingGraph(boolean doneReadingGraph) {
+    this.doneReadingGraph = doneReadingGraph;
+  }
 
-  /**
+   /**
    * Try to reserve an InputSplit for loading.  While InputSplits exists that
    * are not finished, wait until they are.
    *
@@ -182,10 +189,12 @@ public class InputSplitsHandler implements Watcher  {
           "state " + event.getState() + ", event type " + event.getType());
       return;
     }
-    // Check if the reservation for the input split was lost
-    // (some worker died)
+    // Check if the reservation for the input split was lost in INPUT_SUPERSTEP
+    // (some worker died). If INPUT_SUPERSTEP has already completed, we ignore
+    // this event.
     if (event.getPath().endsWith(inputSplitReservedNode) &&
-        event.getType() == Watcher.Event.EventType.NodeDeleted) {
+        event.getType() == Watcher.Event.EventType.NodeDeleted &&
+        !doneReadingGraph) {
       synchronized (pathList) {
         String split = event.getPath();
         split = split.substring(0, split.indexOf(inputSplitReservedNode));