You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/02/05 23:44:47 UTC

git commit: GIRAPH-501: WorkerObserver

Updated Branches:
  refs/heads/trunk e91875175 -> c1ef88914


GIRAPH-501: WorkerObserver


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/c1ef8891
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/c1ef8891
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/c1ef8891

Branch: refs/heads/trunk
Commit: c1ef889142ae5e2526f8b83138d0dfee31fb7db6
Parents: e918751
Author: Nitay Joffe <ni...@apache.org>
Authored: Tue Feb 5 14:35:19 2013 -0500
Committer: Nitay Joffe <ni...@apache.org>
Committed: Tue Feb 5 17:43:26 2013 -0500

----------------------------------------------------------------------
 .../giraph/bsp/CentralizedServiceWorker.java       |   27 +++++---
 .../apache/giraph/conf/GiraphConfiguration.java    |   25 ++++++-
 .../org/apache/giraph/conf/GiraphConstants.java    |    4 +-
 .../conf/ImmutableClassesGiraphConfiguration.java  |   15 ++++
 .../org/apache/giraph/graph/GraphTaskManager.java  |   36 ++++++++--
 .../org/apache/giraph/worker/BspServiceWorker.java |   34 ++++++++--
 .../giraph/worker/DefaultWorkerObserver.java       |   57 +++++++++++++++
 .../org/apache/giraph/worker/WorkerObserver.java   |   56 ++++++++++++++
 8 files changed, 228 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
index 71f8f72..56b5d03 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
@@ -18,26 +18,26 @@
 
 package org.apache.giraph.bsp;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerClient;
 import org.apache.giraph.graph.FinishedSuperstepStats;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.GraphTaskManager;
-import org.apache.giraph.master.MasterInfo;
 import org.apache.giraph.graph.VertexEdgeCount;
-import org.apache.giraph.worker.WorkerAggregatorHandler;
+import org.apache.giraph.master.MasterInfo;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.worker.WorkerAggregatorHandler;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.giraph.worker.WorkerObserver;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.partition.PartitionStats;
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.giraph.worker.WorkerContext;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
 
 /**
  * All workers should have access to this centralized service to
@@ -82,6 +82,13 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
   WorkerContext getWorkerContext();
 
   /**
+   * Get the observers for this Worker.
+   *
+   * @return array of WorkerObservers.
+   */
+  WorkerObserver[] getWorkerObservers();
+
+  /**
    * Get the partition store for this worker.
    * The partitions contain the vertices for
    * this worker and can be used to run compute() for the vertices or do

http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index b3b9c4b..9ca1e7e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -32,6 +32,7 @@ import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.vertex.Vertex;
 import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.worker.WorkerObserver;
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -115,6 +116,17 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Add a WorkerObserver class (optional)
+   *
+   * @param workerObserverClass WorkerObserver class to add.
+   */
+  public final void addWorkerObserverClass(
+      Class<? extends WorkerObserver> workerObserverClass) {
+    addToClasses(WORKER_OBSERVER_CLASSES, workerObserverClass,
+        WorkerObserver.class);
+  }
+
+  /**
    * Get job observer class
    *
    * @return GiraphJobObserver class set.
@@ -271,9 +283,7 @@ public class GiraphConfiguration extends Configuration
    */
   public final void setWorkerContextClass(
       Class<? extends WorkerContext> workerContextClass) {
-    setClass(WORKER_CONTEXT_CLASS,
-        workerContextClass,
-        WorkerContext.class);
+    setClass(WORKER_CONTEXT_CLASS, workerContextClass, WorkerContext.class);
   }
 
   /**
@@ -356,6 +366,15 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Get array of WorkerObserver classes set in configuration.
+   *
+   * @return array of WorkerObserver classes.
+   */
+  public Class<? extends WorkerObserver>[] getWorkerObserverClasses() {
+    return getClassesOfType(WORKER_OBSERVER_CLASSES, WorkerObserver.class);
+  }
+
+  /**
    * Whether to track, print, and aggregate metrics.
    *
    * @return true if metrics are enabled, false otherwise (default)

http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 51415c2..fb4e8a3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -27,8 +27,10 @@ public interface GiraphConstants {
 
   /** Class for Master - optional */
   String MASTER_COMPUTE_CLASS = "giraph.masterComputeClass";
-  /** Classes for Observer Master - optional */
+  /** Classes for Master Observer - optional */
   String MASTER_OBSERVER_CLASSES = "giraph.master.observers";
+  /** Classes for Worker Observer - optional */
+  String WORKER_OBSERVER_CLASSES = "giraph.worker.observers";
   /** Vertex combiner class - optional */
   String VERTEX_COMBINER_CLASS = "giraph.combinerClass";
   /** Vertex resolver class - optional */

http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index d75d624..30a7da7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -41,6 +41,7 @@ import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
+import org.apache.giraph.worker.WorkerObserver;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
@@ -417,6 +418,20 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Create array of WorkerObservers.
+   *
+   * @return Instantiated array of WorkerObservers.
+   */
+  public WorkerObserver[] createWorkerObservers() {
+    Class<? extends WorkerObserver>[] klasses = getWorkerObserverClasses();
+    WorkerObserver[] objects = new WorkerObserver[klasses.length];
+    for (int i = 0; i < klasses.length; ++i) {
+      objects[i] = ReflectionUtils.newInstance(klasses[i], this);
+    }
+    return objects;
+  }
+
+  /**
    * Create job observer
    * @return GiraphJobObserver set in configuration.
    */

http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 4ede8bb..f7fb7e9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -45,6 +45,7 @@ import org.apache.giraph.vertex.Vertex;
 import org.apache.giraph.worker.BspServiceWorker;
 import org.apache.giraph.worker.WorkerAggregatorUsage;
 import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.worker.WorkerObserver;
 import org.apache.giraph.zk.ZooKeeperManager;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -252,8 +253,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
       context.progress();
       graphState = checkSuperstepRestarted(
         aggregatorUsage, superstep, graphState);
-      GiraphTimerContext perSuperstepTimer = prepareForSuperstep(graphState);
-      perSuperstepTimer.stop();
+      prepareForSuperstep(graphState);
       context.progress();
       MessageStoreByPartition<I, M> messageStore =
         serviceWorker.getServerData().getCurrentMessageStore();
@@ -279,10 +279,22 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
       LOG.info("execute: BSP application done (global vertices marked done)");
     }
     updateSuperstepGraphState(aggregatorUsage);
+    postApplication();
+  }
+
+  /**
+   * Handle post-application callbacks.
+   */
+  private void postApplication() {
     GiraphTimerContext postAppTimerContext = wcPostAppTimer.time();
     serviceWorker.getWorkerContext().postApplication();
     postAppTimerContext.stop();
     context.progress();
+
+    for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
+      obs.postApplication();
+      context.progress();
+    }
   }
 
   /**
@@ -365,15 +377,20 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
    * Utility function to prepare various objects managing BSP superstep
    * operations for the next superstep.
    * @param graphState graph state metadata object
-   * @return the timer context for superstep metrics
    */
-  private GiraphTimerContext prepareForSuperstep(
-    GraphState<I, V, E, M> graphState) {
+  private void prepareForSuperstep(GraphState<I, V, E, M> graphState) {
     serviceWorker.prepareSuperstep();
+
     serviceWorker.getWorkerContext().setGraphState(graphState);
-    GiraphTimerContext perSuperstepTimer = wcPreSuperstepTimer.time();
+    GiraphTimerContext preSuperstepTimer = wcPreSuperstepTimer.time();
     serviceWorker.getWorkerContext().preSuperstep();
-    return perSuperstepTimer;
+    preSuperstepTimer.stop();
+    context.progress();
+
+    for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
+      obs.preSuperstep(graphState.getSuperstep());
+      context.progress();
+    }
   }
 
   /**
@@ -819,6 +836,11 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
     }
     preAppTimerContext.stop();
     context.progress();
+
+    for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
+      obs.preApplication();
+      context.progress();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 71ea749..e48e01a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -147,6 +147,9 @@ public class BspServiceWorker<I extends WritableComparable,
   /** Handler for aggregators */
   private final WorkerAggregatorHandler aggregatorHandler;
 
+  /** array of observers to call back to */
+  private final WorkerObserver[] observers;
+
   // Per-Superstep Metrics
   /** Timer for WorkerContext#postSuperstep */
   private GiraphTimer wcPostSuperstepTimer;
@@ -191,6 +194,8 @@ public class BspServiceWorker<I extends WritableComparable,
     aggregatorHandler =
         new WorkerAggregatorHandler(this, getConfiguration(), context);
 
+    observers = getConfiguration().createWorkerObservers();
+
     GiraphMetrics.get().addSuperstepResetObserver(this);
   }
 
@@ -208,6 +213,11 @@ public class BspServiceWorker<I extends WritableComparable,
   }
 
   @Override
+  public WorkerObserver[] getWorkerObservers() {
+    return observers;
+  }
+
+  @Override
   public WorkerClient<I, V, E, M> getWorkerClient() {
     return workerClient;
   }
@@ -750,11 +760,7 @@ else[HADOOP_NON_SECURE]*/
     }
 
     if (getSuperstep() != INPUT_SUPERSTEP) {
-      getWorkerContext().setGraphState(graphState);
-      GiraphTimerContext timerContext = wcPostSuperstepTimer.time();
-      getWorkerContext().postSuperstep();
-      timerContext.stop();
-      getContext().progress();
+      postSuperstepCallbacks(graphState);
     }
 
     aggregatorHandler.finishSuperstep(workerAggregatorRequestProcessor);
@@ -801,6 +807,24 @@ else[HADOOP_NON_SECURE]*/
   }
 
   /**
+   * Handle post-superstep callbacks
+   *
+   * @param graphState GraphState
+   */
+  private void postSuperstepCallbacks(GraphState<I, V, E, M> graphState) {
+    getWorkerContext().setGraphState(graphState);
+    GiraphTimerContext timerContext = wcPostSuperstepTimer.time();
+    getWorkerContext().postSuperstep();
+    timerContext.stop();
+    getContext().progress();
+
+    for (WorkerObserver obs : getWorkerObservers()) {
+      obs.postSuperstep(graphState.getSuperstep());
+      getContext().progress();
+    }
+  }
+
+  /**
    * Wait for all the requests to finish.
    */
   private void waitForRequestsToFinish() {

http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java b/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java
new file mode 100644
index 0000000..5c8c94a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+
+/**
+ * Default Observer for Worker that does nothing.
+ */
+public class DefaultWorkerObserver implements WorkerObserver,
+    ImmutableClassesGiraphConfigurable {
+  /** The configuration stored here */
+  private ImmutableClassesGiraphConfiguration conf;
+
+  @Override
+  public void preApplication() {
+  }
+
+  @Override
+  public void postApplication() {
+  }
+
+  @Override
+  public void preSuperstep(long superstep) {
+  }
+
+  @Override
+  public void postSuperstep(long superstep) {
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration configuration) {
+    this.conf = configuration;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java
new file mode 100644
index 0000000..fc62629
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+/**
+ * Observer for Worker
+ */
+public interface WorkerObserver {
+  /**
+   * Initialize the WorkerContext.
+   * This method is executed once on each Worker before the first
+   * superstep starts.
+   */
+  void preApplication();
+
+  /**
+   * Finalize the WorkerContext.
+   * This method is executed once on each Worker after the last
+   * superstep ends.
+   */
+  void postApplication();
+
+  /**
+   * Execute user code.
+   * This method is executed once on each Worker before each
+   * superstep starts.
+   *
+   * @param superstep number of superstep
+   */
+  void preSuperstep(long superstep);
+
+  /**
+   * Execute user code.
+   * This method is executed once on each Worker after each
+   * superstep ends.
+   *
+   * @param superstep number of superstep
+   */
+  void postSuperstep(long superstep);
+}