You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2014/03/07 22:39:34 UTC
git commit: updated refs/heads/trunk to 6d603ec
Repository: giraph
Updated Branches:
refs/heads/trunk 17fac7292 -> 6d603ec7d
GIRAPH-868: Fix race condition with WorkerProgress (majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/6d603ec7
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/6d603ec7
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/6d603ec7
Branch: refs/heads/trunk
Commit: 6d603ec7ddbb78cfa1b0bf14e3efa22a0676f5c8
Parents: 17fac72
Author: Maja Kabiljo <ma...@fb.com>
Authored: Fri Mar 7 13:38:56 2014 -0800
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Fri Mar 7 13:38:56 2014 -0800
----------------------------------------------------------------------
CHANGELOG | 1 +
.../org/apache/giraph/worker/BspServiceWorker.java | 1 -
.../apache/giraph/worker/WorkerProgressWriter.java | 16 ++++++++++++----
3 files changed, 13 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/6d603ec7/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index fa93d13..c241971 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,7 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-868: Fix race condition with WorkerProgress (majakabiljo)
GIRAPH-867: Fix comments in PageRankComputation (ssc)
http://git-wip-us.apache.org/repos/asf/giraph/blob/6d603ec7/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index c0b28dd..6b721b3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -1180,7 +1180,6 @@ public class BspServiceWorker<I extends WritableComparable,
saveEdges();
WorkerProgress.get().finishStoring();
if (workerProgressWriter != null) {
- WorkerProgress.writeToZnode(getZkExt(), myProgressPath);
workerProgressWriter.stop();
}
getPartitionStore().shutdown();
http://git-wip-us.apache.org/repos/asf/giraph/blob/6d603ec7/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
index 95e46e4..4ff5bb1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
@@ -35,6 +35,10 @@ public class WorkerProgressWriter {
private final Thread writerThread;
/** Whether worker finished application */
private volatile boolean finished = false;
+ /** Path where this worker's progress should be stored */
+ private final String myProgressPath;
+ /** ZooKeeperExt */
+ private final ZooKeeperExt zk;
/**
* Constructor, starts separate thread to periodically update worker's
@@ -43,15 +47,17 @@ public class WorkerProgressWriter {
* @param myProgressPath Path where this worker's progress should be stored
* @param zk ZooKeeperExt
*/
- public WorkerProgressWriter(final String myProgressPath,
- final ZooKeeperExt zk) {
+ public WorkerProgressWriter(String myProgressPath, ZooKeeperExt zk) {
+ this.myProgressPath = myProgressPath;
+ this.zk = zk;
writerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
while (!finished) {
WorkerProgress.get().updateMemory();
- WorkerProgress.writeToZnode(zk, myProgressPath);
+ WorkerProgress.writeToZnode(WorkerProgressWriter.this.zk,
+ WorkerProgressWriter.this.myProgressPath);
double factor = 1 + Math.random();
Thread.sleep((long) (WRITE_UPDATE_PERIOD_MILLISECONDS * factor));
}
@@ -69,8 +75,10 @@ public class WorkerProgressWriter {
/**
* Stop the thread which writes worker's progress
*/
- public void stop() {
+ public void stop() throws InterruptedException {
finished = true;
writerThread.interrupt();
+ writerThread.join();
+ WorkerProgress.writeToZnode(zk, myProgressPath);
}
}