You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by pa...@apache.org on 2014/08/05 23:47:46 UTC
git commit: updated refs/heads/trunk to fc26773
Repository: giraph
Updated Branches:
refs/heads/trunk d455270e2 -> fc2677348
GIRAPH-893: Implement preLoad & postSave on workerObservers (pavanka)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/fc267734
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/fc267734
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/fc267734
Branch: refs/heads/trunk
Commit: fc267734887dddd7c379144d6d5499fe3d541db8
Parents: d455270
Author: Pavan Kumar <pa...@fb.com>
Authored: Tue Aug 5 14:35:04 2014 -0700
Committer: Pavan Kumar <pa...@fb.com>
Committed: Tue Aug 5 14:35:32 2014 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
.../apache/giraph/graph/GraphTaskManager.java | 22 ++++++++++
.../apache/giraph/utils/JMapHistoDumper.java | 12 +++++-
.../org/apache/giraph/utils/LogVersions.java | 6 +++
.../giraph/utils/ReactiveJMapHistoDumper.java | 12 +++++-
.../giraph/worker/DefaultWorkerObserver.java | 16 +++++--
.../apache/giraph/worker/WorkerObserver.java | 44 +++++++++++++-------
7 files changed, 90 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/fc267734/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 3c8e155..300215a 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-893: Implement preLoad & postSave on workerObservers (pavanka)
+
GIRAPH-936: AsyncMessageStoreWrapper threads are not daemonized (edunov via majakabiljo)
GIRAPH-934: Allow having state in aggregators (ikabiljo via majakabiljo)
http://git-wip-us.apache.org/repos/asf/giraph/blob/fc267734/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 684f4eb..6ebb002 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -255,6 +255,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
if (checkTaskState()) {
return;
}
+ preLoadOnWorkerObservers();
finishedSuperstepStats = serviceWorker.setup();
if (collectInputSuperstepStats(finishedSuperstepStats)) {
return;
@@ -830,6 +831,26 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
}
/**
+ * Executes preLoad() on worker observers.
+ */
+ private void preLoadOnWorkerObservers() {
+ for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
+ obs.preLoad();
+ context.progress();
+ }
+ }
+
+ /**
+ * Executes postSave() on worker observers.
+ */
+ private void postSaveOnWorkerObservers() {
+ for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
+ obs.postSave();
+ context.progress();
+ }
+ }
+
+ /**
* Called by owner of this GraphTaskManager object on each compute node
*/
public void cleanup()
@@ -843,6 +864,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
if (serviceWorker != null) {
serviceWorker.cleanup(finishedSuperstepStats);
+ postSaveOnWorkerObservers();
}
try {
if (masterThread != null) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/fc267734/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java b/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
index 3bcf42e..f90337f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
@@ -45,17 +45,25 @@ public class JMapHistoDumper implements MasterObserver, WorkerObserver {
private boolean stop = false;
@Override
- public void preApplication() {
+ public void preLoad() {
// This is called by both WorkerObserver and MasterObserver
startJMapThread();
}
@Override
- public void postApplication() {
+ public void postSave() {
// This is called by both WorkerObserver and MasterObserver
joinJMapThread();
}
+ @Override
+ public void preApplication() {
+ }
+
+ @Override
+ public void postApplication() {
+ }
+
/**
* Join the jmap thread
*/
http://git-wip-us.apache.org/repos/asf/giraph/blob/fc267734/giraph-core/src/main/java/org/apache/giraph/utils/LogVersions.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/LogVersions.java b/giraph-core/src/main/java/org/apache/giraph/utils/LogVersions.java
index 8305df7..5bdad87 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/LogVersions.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/LogVersions.java
@@ -37,6 +37,12 @@ public class LogVersions implements WorkerObserver, MasterObserver {
}
@Override
+ public void preLoad() { }
+
+ @Override
+ public void postSave() { }
+
+ @Override
public void preApplication() {
GiraphDepVersions.get().logVersionsUsed();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fc267734/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java b/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java
index 68369d9..844f929 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java
@@ -52,17 +52,25 @@ public class ReactiveJMapHistoDumper extends
private volatile boolean stop = false;
@Override
- public void preApplication() {
+ public void preLoad() {
// This is called by both WorkerObserver and MasterObserver
startSupervisorThread();
}
@Override
- public void postApplication() {
+ public void postSave() {
// This is called by both WorkerObserver and MasterObserver
joinSupervisorThread();
}
+ @Override
+ public void preApplication() {
+ }
+
+ @Override
+ public void postApplication() {
+ }
+
/**
* Join the supervisor thread
*/
http://git-wip-us.apache.org/repos/asf/giraph/blob/fc267734/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java b/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java
index 5c8c94a..694c4ed 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java
@@ -18,18 +18,26 @@
package org.apache.giraph.worker;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
/**
* Default Observer for Worker that does nothing.
*/
-public class DefaultWorkerObserver implements WorkerObserver,
- ImmutableClassesGiraphConfigurable {
- /** The configuration stored here */
+public class DefaultWorkerObserver implements WorkerObserver {
+ /**
+ * The configuration stored here
+ */
private ImmutableClassesGiraphConfiguration conf;
@Override
+ public void preLoad() {
+ }
+
+ @Override
+ public void postSave() {
+ }
+
+ @Override
public void preApplication() {
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fc267734/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java
index fc62629..b1b40db 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java
@@ -18,39 +18,51 @@
package org.apache.giraph.worker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+
/**
- * Observer for Worker
+ * Observer for worker. The user can subclass and register an observer with the
+ * Giraph framework. The framework will execute methods of the observer at
+ * designated moments of computation on each worker.
*/
-public interface WorkerObserver {
+public interface WorkerObserver extends ImmutableClassesGiraphConfigurable {
/**
- * Initialize the WorkerContext.
- * This method is executed once on each Worker before the first
- * superstep starts.
+ * Initialize the observer. This method is executed once on each worker before
+ * loading.
*/
- void preApplication();
+ void preLoad();
/**
- * Finalize the WorkerContext.
- * This method is executed once on each Worker after the last
- * superstep ends.
+ * Initialize the observer. This method is executed once on each worker after
+ * loading before the first superstep starts.
*/
- void postApplication();
+ void preApplication();
/**
- * Execute user code.
- * This method is executed once on each Worker before each
- * superstep starts.
+ * Execute the observer. This method is executed once on each worker before
+ * each superstep starts.
*
* @param superstep number of superstep
*/
void preSuperstep(long superstep);
/**
- * Execute user code.
- * This method is executed once on each Worker after each
- * superstep ends.
+ * Execute the observer. This method is executed once on each worker after
+ * each superstep ends.
*
* @param superstep number of superstep
*/
void postSuperstep(long superstep);
+
+ /**
+ * Finalize the observer. This method is executed once on each worker after
+ * the last superstep ends before saving.
+ */
+ void postApplication();
+
+ /**
+ * Finalize the observer. This method is executed once on each worker after
+ * saving.
+ */
+ void postSave();
}