You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by pa...@apache.org on 2014/08/05 23:47:46 UTC

git commit: updated refs/heads/trunk to fc26773

Repository: giraph
Updated Branches:
  refs/heads/trunk d455270e2 -> fc2677348


GIRAPH-893: Implement preLoad & postSave on workerObservers (pavanka)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/fc267734
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/fc267734
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/fc267734

Branch: refs/heads/trunk
Commit: fc267734887dddd7c379144d6d5499fe3d541db8
Parents: d455270
Author: Pavan Kumar <pa...@fb.com>
Authored: Tue Aug 5 14:35:04 2014 -0700
Committer: Pavan Kumar <pa...@fb.com>
Committed: Tue Aug 5 14:35:32 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |  2 +
 .../apache/giraph/graph/GraphTaskManager.java   | 22 ++++++++++
 .../apache/giraph/utils/JMapHistoDumper.java    | 12 +++++-
 .../org/apache/giraph/utils/LogVersions.java    |  6 +++
 .../giraph/utils/ReactiveJMapHistoDumper.java   | 12 +++++-
 .../giraph/worker/DefaultWorkerObserver.java    | 16 +++++--
 .../apache/giraph/worker/WorkerObserver.java    | 44 +++++++++++++-------
 7 files changed, 90 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/fc267734/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 3c8e155..300215a 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-893: Implement preLoad & postSave on workerObservers (pavanka)
+
   GIRAPH-936: AsyncMessageStoreWrapper threads are not daemonized (edunov via majakabiljo)
 
   GIRAPH-934: Allow having state in aggregators (ikabiljo via majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/fc267734/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 684f4eb..6ebb002 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -255,6 +255,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
     if (checkTaskState()) {
       return;
     }
+    preLoadOnWorkerObservers();
     finishedSuperstepStats = serviceWorker.setup();
     if (collectInputSuperstepStats(finishedSuperstepStats)) {
       return;
@@ -830,6 +831,26 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   }
 
   /**
+   * Executes preLoad() on worker observers.
+   */
+  private void preLoadOnWorkerObservers() {
+    for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
+      obs.preLoad();
+      context.progress();
+    }
+  }
+
+  /**
+   * Executes postSave() on worker observers.
+   */
+  private void postSaveOnWorkerObservers() {
+    for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
+      obs.postSave();
+      context.progress();
+    }
+  }
+
+  /**
    * Called by owner of this GraphTaskManager object on each compute node
    */
   public void cleanup()
@@ -843,6 +864,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
 
     if (serviceWorker != null) {
       serviceWorker.cleanup(finishedSuperstepStats);
+      postSaveOnWorkerObservers();
     }
     try {
       if (masterThread != null) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/fc267734/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java b/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
index 3bcf42e..f90337f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
@@ -45,17 +45,25 @@ public class JMapHistoDumper implements MasterObserver, WorkerObserver {
   private boolean stop = false;
 
   @Override
-  public void preApplication() {
+  public void preLoad() {
     // This is called by both WorkerObserver and MasterObserver
     startJMapThread();
   }
 
   @Override
-  public void postApplication() {
+  public void postSave() {
     // This is called by both WorkerObserver and MasterObserver
     joinJMapThread();
   }
 
+  @Override
+  public void preApplication() {
+  }
+
+  @Override
+  public void postApplication() {
+  }
+
   /**
    * Join the jmap thread
    */

http://git-wip-us.apache.org/repos/asf/giraph/blob/fc267734/giraph-core/src/main/java/org/apache/giraph/utils/LogVersions.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/LogVersions.java b/giraph-core/src/main/java/org/apache/giraph/utils/LogVersions.java
index 8305df7..5bdad87 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/LogVersions.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/LogVersions.java
@@ -37,6 +37,12 @@ public class LogVersions implements WorkerObserver, MasterObserver {
   }
 
   @Override
+  public void preLoad() { }
+
+  @Override
+  public void postSave() { }
+
+  @Override
   public void preApplication() {
     GiraphDepVersions.get().logVersionsUsed();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fc267734/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java b/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java
index 68369d9..844f929 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java
@@ -52,17 +52,25 @@ public class ReactiveJMapHistoDumper extends
   private volatile boolean stop = false;
 
   @Override
-  public void preApplication() {
+  public void preLoad() {
     // This is called by both WorkerObserver and MasterObserver
     startSupervisorThread();
   }
 
   @Override
-  public void postApplication() {
+  public void postSave() {
     // This is called by both WorkerObserver and MasterObserver
     joinSupervisorThread();
   }
 
+  @Override
+  public void preApplication() {
+  }
+
+  @Override
+  public void postApplication() {
+  }
+
   /**
    * Join the supervisor thread
    */

http://git-wip-us.apache.org/repos/asf/giraph/blob/fc267734/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java b/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java
index 5c8c94a..694c4ed 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java
@@ -18,18 +18,26 @@
 
 package org.apache.giraph.worker;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 
 /**
  * Default Observer for Worker that does nothing.
  */
-public class DefaultWorkerObserver implements WorkerObserver,
-    ImmutableClassesGiraphConfigurable {
-  /** The configuration stored here */
+public class DefaultWorkerObserver implements WorkerObserver {
+  /**
+   * The configuration stored here
+   */
   private ImmutableClassesGiraphConfiguration conf;
 
   @Override
+  public void preLoad() {
+  }
+
+  @Override
+  public void postSave() {
+  }
+
+  @Override
   public void preApplication() {
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/fc267734/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java
index fc62629..b1b40db 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java
@@ -18,39 +18,51 @@
 
 package org.apache.giraph.worker;
 
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+
 /**
- * Observer for Worker
+ * Observer for worker. The user can subclass and register an observer with the
+ * Giraph framework. The framework will execute methods of the observer at
+ * designated moments of computation on each worker.
  */
-public interface WorkerObserver {
+public interface WorkerObserver extends ImmutableClassesGiraphConfigurable {
   /**
-   * Initialize the WorkerContext.
-   * This method is executed once on each Worker before the first
-   * superstep starts.
+   * Initialize the observer. This method is executed once on each worker before
+   * loading.
    */
-  void preApplication();
+  void preLoad();
 
   /**
-   * Finalize the WorkerContext.
-   * This method is executed once on each Worker after the last
-   * superstep ends.
+   * Initialize the observer. This method is executed once on each worker after
+   * loading before the first superstep starts.
    */
-  void postApplication();
+  void preApplication();
 
   /**
-   * Execute user code.
-   * This method is executed once on each Worker before each
-   * superstep starts.
+   * Execute the observer. This method is executed once on each worker before
+   * each superstep starts.
    *
    * @param superstep number of superstep
    */
   void preSuperstep(long superstep);
 
   /**
-   * Execute user code.
-   * This method is executed once on each Worker after each
-   * superstep ends.
+   * Execute the observer. This method is executed once on each worker after
+   * each superstep ends.
    *
    * @param superstep number of superstep
    */
   void postSuperstep(long superstep);
+
+  /**
+   * Finalize the observer. This method is executed once on each worker after
+   * the last superstep ends before saving.
+   */
+  void postApplication();
+
+  /**
+   * Finalize the observer. This method is executed once on each worker after
+   * saving.
+   */
+  void postSave();
 }