You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2014/03/07 22:39:34 UTC

git commit: updated refs/heads/trunk to 6d603ec

Repository: giraph
Updated Branches:
  refs/heads/trunk 17fac7292 -> 6d603ec7d


GIRAPH-868: Fix race condition with WorkerProgress (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/6d603ec7
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/6d603ec7
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/6d603ec7

Branch: refs/heads/trunk
Commit: 6d603ec7ddbb78cfa1b0bf14e3efa22a0676f5c8
Parents: 17fac72
Author: Maja Kabiljo <ma...@fb.com>
Authored: Fri Mar 7 13:38:56 2014 -0800
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Fri Mar 7 13:38:56 2014 -0800

----------------------------------------------------------------------
 CHANGELOG                                           |  1 +
 .../org/apache/giraph/worker/BspServiceWorker.java  |  1 -
 .../apache/giraph/worker/WorkerProgressWriter.java  | 16 ++++++++++++----
 3 files changed, 13 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/6d603ec7/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index fa93d13..c241971 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,7 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-868: Fix race condition with WorkerProgress (majakabiljo)
 
   GIRAPH-867: Fix comments in PageRankComputation (ssc)
   

http://git-wip-us.apache.org/repos/asf/giraph/blob/6d603ec7/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index c0b28dd..6b721b3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -1180,7 +1180,6 @@ public class BspServiceWorker<I extends WritableComparable,
     saveEdges();
     WorkerProgress.get().finishStoring();
     if (workerProgressWriter != null) {
-      WorkerProgress.writeToZnode(getZkExt(), myProgressPath);
       workerProgressWriter.stop();
     }
     getPartitionStore().shutdown();

http://git-wip-us.apache.org/repos/asf/giraph/blob/6d603ec7/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
index 95e46e4..4ff5bb1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
@@ -35,6 +35,10 @@ public class WorkerProgressWriter {
   private final Thread writerThread;
   /** Whether worker finished application */
   private volatile boolean finished = false;
+  /** Path where this worker's progress should be stored */
+  private final String myProgressPath;
+  /** ZooKeeperExt */
+  private final ZooKeeperExt zk;
 
   /**
    * Constructor, starts separate thread to periodically update worker's
@@ -43,15 +47,17 @@ public class WorkerProgressWriter {
    * @param myProgressPath Path where this worker's progress should be stored
    * @param zk ZooKeeperExt
    */
-  public WorkerProgressWriter(final String myProgressPath,
-      final ZooKeeperExt zk) {
+  public WorkerProgressWriter(String myProgressPath, ZooKeeperExt zk) {
+    this.myProgressPath = myProgressPath;
+    this.zk = zk;
     writerThread = new Thread(new Runnable() {
       @Override
       public void run() {
         try {
           while (!finished) {
             WorkerProgress.get().updateMemory();
-            WorkerProgress.writeToZnode(zk, myProgressPath);
+            WorkerProgress.writeToZnode(WorkerProgressWriter.this.zk,
+                WorkerProgressWriter.this.myProgressPath);
             double factor = 1 + Math.random();
             Thread.sleep((long) (WRITE_UPDATE_PERIOD_MILLISECONDS * factor));
           }
@@ -69,8 +75,10 @@ public class WorkerProgressWriter {
   /**
    * Stop the thread which writes worker's progress
    */
-  public void stop() {
+  public void stop() throws InterruptedException {
     finished = true;
     writerThread.interrupt();
+    writerThread.join();
+    WorkerProgress.writeToZnode(zk, myProgressPath);
   }
 }