You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/02/05 23:44:47 UTC
git commit: GIRAPH-501: WorkerObserver
Updated Branches:
refs/heads/trunk e91875175 -> c1ef88914
GIRAPH-501: WorkerObserver
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/c1ef8891
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/c1ef8891
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/c1ef8891
Branch: refs/heads/trunk
Commit: c1ef889142ae5e2526f8b83138d0dfee31fb7db6
Parents: e918751
Author: Nitay Joffe <ni...@apache.org>
Authored: Tue Feb 5 14:35:19 2013 -0500
Committer: Nitay Joffe <ni...@apache.org>
Committed: Tue Feb 5 17:43:26 2013 -0500
----------------------------------------------------------------------
.../giraph/bsp/CentralizedServiceWorker.java | 27 +++++---
.../apache/giraph/conf/GiraphConfiguration.java | 25 ++++++-
.../org/apache/giraph/conf/GiraphConstants.java | 4 +-
.../conf/ImmutableClassesGiraphConfiguration.java | 15 ++++
.../org/apache/giraph/graph/GraphTaskManager.java | 36 ++++++++--
.../org/apache/giraph/worker/BspServiceWorker.java | 34 ++++++++--
.../giraph/worker/DefaultWorkerObserver.java | 57 +++++++++++++++
.../org/apache/giraph/worker/WorkerObserver.java | 56 ++++++++++++++
8 files changed, 228 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
index 71f8f72..56b5d03 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
@@ -18,26 +18,26 @@
package org.apache.giraph.bsp;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.WorkerClient;
import org.apache.giraph.graph.FinishedSuperstepStats;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.GraphTaskManager;
-import org.apache.giraph.master.MasterInfo;
import org.apache.giraph.graph.VertexEdgeCount;
-import org.apache.giraph.worker.WorkerAggregatorHandler;
+import org.apache.giraph.master.MasterInfo;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.partition.PartitionStats;
import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.worker.WorkerAggregatorHandler;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.giraph.worker.WorkerObserver;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.partition.PartitionStats;
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.giraph.worker.WorkerContext;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
/**
* All workers should have access to this centralized service to
@@ -82,6 +82,13 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
WorkerContext getWorkerContext();
/**
+ * Get the observers for this Worker.
+ *
+ * @return array of WorkerObservers.
+ */
+ WorkerObserver[] getWorkerObservers();
+
+ /**
* Get the partition store for this worker.
* The partitions contain the vertices for
* this worker and can be used to run compute() for the vertices or do
http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index b3b9c4b..9ca1e7e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -32,6 +32,7 @@ import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.vertex.Vertex;
import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.worker.WorkerObserver;
import org.apache.hadoop.conf.Configuration;
/**
@@ -115,6 +116,17 @@ public class GiraphConfiguration extends Configuration
}
/**
+ * Add a WorkerObserver class (optional)
+ *
+ * @param workerObserverClass WorkerObserver class to add.
+ */
+ public final void addWorkerObserverClass(
+ Class<? extends WorkerObserver> workerObserverClass) {
+ addToClasses(WORKER_OBSERVER_CLASSES, workerObserverClass,
+ WorkerObserver.class);
+ }
+
+ /**
* Get job observer class
*
* @return GiraphJobObserver class set.
@@ -271,9 +283,7 @@ public class GiraphConfiguration extends Configuration
*/
public final void setWorkerContextClass(
Class<? extends WorkerContext> workerContextClass) {
- setClass(WORKER_CONTEXT_CLASS,
- workerContextClass,
- WorkerContext.class);
+ setClass(WORKER_CONTEXT_CLASS, workerContextClass, WorkerContext.class);
}
/**
@@ -356,6 +366,15 @@ public class GiraphConfiguration extends Configuration
}
/**
+ * Get array of WorkerObserver classes set in configuration.
+ *
+ * @return array of WorkerObserver classes.
+ */
+ public Class<? extends WorkerObserver>[] getWorkerObserverClasses() {
+ return getClassesOfType(WORKER_OBSERVER_CLASSES, WorkerObserver.class);
+ }
+
+ /**
* Whether to track, print, and aggregate metrics.
*
* @return true if metrics are enabled, false otherwise (default)
http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 51415c2..fb4e8a3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -27,8 +27,10 @@ public interface GiraphConstants {
/** Class for Master - optional */
String MASTER_COMPUTE_CLASS = "giraph.masterComputeClass";
- /** Classes for Observer Master - optional */
+ /** Classes for Master Observer - optional */
String MASTER_OBSERVER_CLASSES = "giraph.master.observers";
+ /** Classes for Worker Observer - optional */
+ String WORKER_OBSERVER_CLASSES = "giraph.worker.observers";
/** Vertex combiner class - optional */
String VERTEX_COMBINER_CLASS = "giraph.combinerClass";
/** Vertex resolver class - optional */
http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index d75d624..30a7da7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -41,6 +41,7 @@ import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.utils.UnsafeByteArrayInputStream;
import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
+import org.apache.giraph.worker.WorkerObserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
@@ -417,6 +418,20 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
+ * Create array of WorkerObservers.
+ *
+ * @return Instantiated array of WorkerObservers.
+ */
+ public WorkerObserver[] createWorkerObservers() {
+ Class<? extends WorkerObserver>[] klasses = getWorkerObserverClasses();
+ WorkerObserver[] objects = new WorkerObserver[klasses.length];
+ for (int i = 0; i < klasses.length; ++i) {
+ objects[i] = ReflectionUtils.newInstance(klasses[i], this);
+ }
+ return objects;
+ }
+
+ /**
* Create job observer
* @return GiraphJobObserver set in configuration.
*/
http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 4ede8bb..f7fb7e9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -45,6 +45,7 @@ import org.apache.giraph.vertex.Vertex;
import org.apache.giraph.worker.BspServiceWorker;
import org.apache.giraph.worker.WorkerAggregatorUsage;
import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.worker.WorkerObserver;
import org.apache.giraph.zk.ZooKeeperManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -252,8 +253,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
context.progress();
graphState = checkSuperstepRestarted(
aggregatorUsage, superstep, graphState);
- GiraphTimerContext perSuperstepTimer = prepareForSuperstep(graphState);
- perSuperstepTimer.stop();
+ prepareForSuperstep(graphState);
context.progress();
MessageStoreByPartition<I, M> messageStore =
serviceWorker.getServerData().getCurrentMessageStore();
@@ -279,10 +279,22 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
LOG.info("execute: BSP application done (global vertices marked done)");
}
updateSuperstepGraphState(aggregatorUsage);
+ postApplication();
+ }
+
+ /**
+ * Handle post-application callbacks.
+ */
+ private void postApplication() {
GiraphTimerContext postAppTimerContext = wcPostAppTimer.time();
serviceWorker.getWorkerContext().postApplication();
postAppTimerContext.stop();
context.progress();
+
+ for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
+ obs.postApplication();
+ context.progress();
+ }
}
/**
@@ -365,15 +377,20 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
* Utility function to prepare various objects managing BSP superstep
* operations for the next superstep.
* @param graphState graph state metadata object
- * @return the timer context for superstep metrics
*/
- private GiraphTimerContext prepareForSuperstep(
- GraphState<I, V, E, M> graphState) {
+ private void prepareForSuperstep(GraphState<I, V, E, M> graphState) {
serviceWorker.prepareSuperstep();
+
serviceWorker.getWorkerContext().setGraphState(graphState);
- GiraphTimerContext perSuperstepTimer = wcPreSuperstepTimer.time();
+ GiraphTimerContext preSuperstepTimer = wcPreSuperstepTimer.time();
serviceWorker.getWorkerContext().preSuperstep();
- return perSuperstepTimer;
+ preSuperstepTimer.stop();
+ context.progress();
+
+ for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
+ obs.preSuperstep(graphState.getSuperstep());
+ context.progress();
+ }
}
/**
@@ -819,6 +836,11 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
}
preAppTimerContext.stop();
context.progress();
+
+ for (WorkerObserver obs : serviceWorker.getWorkerObservers()) {
+ obs.preApplication();
+ context.progress();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 71ea749..e48e01a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -147,6 +147,9 @@ public class BspServiceWorker<I extends WritableComparable,
/** Handler for aggregators */
private final WorkerAggregatorHandler aggregatorHandler;
+ /** array of observers to call back to */
+ private final WorkerObserver[] observers;
+
// Per-Superstep Metrics
/** Timer for WorkerContext#postSuperstep */
private GiraphTimer wcPostSuperstepTimer;
@@ -191,6 +194,8 @@ public class BspServiceWorker<I extends WritableComparable,
aggregatorHandler =
new WorkerAggregatorHandler(this, getConfiguration(), context);
+ observers = getConfiguration().createWorkerObservers();
+
GiraphMetrics.get().addSuperstepResetObserver(this);
}
@@ -208,6 +213,11 @@ public class BspServiceWorker<I extends WritableComparable,
}
@Override
+ public WorkerObserver[] getWorkerObservers() {
+ return observers;
+ }
+
+ @Override
public WorkerClient<I, V, E, M> getWorkerClient() {
return workerClient;
}
@@ -750,11 +760,7 @@ else[HADOOP_NON_SECURE]*/
}
if (getSuperstep() != INPUT_SUPERSTEP) {
- getWorkerContext().setGraphState(graphState);
- GiraphTimerContext timerContext = wcPostSuperstepTimer.time();
- getWorkerContext().postSuperstep();
- timerContext.stop();
- getContext().progress();
+ postSuperstepCallbacks(graphState);
}
aggregatorHandler.finishSuperstep(workerAggregatorRequestProcessor);
@@ -801,6 +807,24 @@ else[HADOOP_NON_SECURE]*/
}
/**
+ * Handle post-superstep callbacks
+ *
+ * @param graphState GraphState
+ */
+ private void postSuperstepCallbacks(GraphState<I, V, E, M> graphState) {
+ getWorkerContext().setGraphState(graphState);
+ GiraphTimerContext timerContext = wcPostSuperstepTimer.time();
+ getWorkerContext().postSuperstep();
+ timerContext.stop();
+ getContext().progress();
+
+ for (WorkerObserver obs : getWorkerObservers()) {
+ obs.postSuperstep(graphState.getSuperstep());
+ getContext().progress();
+ }
+ }
+
+ /**
* Wait for all the requests to finish.
*/
private void waitForRequestsToFinish() {
http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java b/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java
new file mode 100644
index 0000000..5c8c94a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+
+/**
+ * Default Observer for Worker that does nothing.
+ */
+public class DefaultWorkerObserver implements WorkerObserver,
+ ImmutableClassesGiraphConfigurable {
+ /** The configuration stored here */
+ private ImmutableClassesGiraphConfiguration conf;
+
+ @Override
+ public void preApplication() {
+ }
+
+ @Override
+ public void postApplication() {
+ }
+
+ @Override
+ public void preSuperstep(long superstep) {
+ }
+
+ @Override
+ public void postSuperstep(long superstep) {
+ }
+
+ @Override
+ public ImmutableClassesGiraphConfiguration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(ImmutableClassesGiraphConfiguration configuration) {
+ this.conf = configuration;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java
new file mode 100644
index 0000000..fc62629
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+/**
+ * Observer for Worker
+ */
+public interface WorkerObserver {
+ /**
+ * Initialize the WorkerContext.
+ * This method is executed once on each Worker before the first
+ * superstep starts.
+ */
+ void preApplication();
+
+ /**
+ * Finalize the WorkerContext.
+ * This method is executed once on each Worker after the last
+ * superstep ends.
+ */
+ void postApplication();
+
+ /**
+ * Execute user code.
+ * This method is executed once on each Worker before each
+ * superstep starts.
+ *
+ * @param superstep number of superstep
+ */
+ void preSuperstep(long superstep);
+
+ /**
+ * Execute user code.
+ * This method is executed once on each Worker after each
+ * superstep ends.
+ *
+ * @param superstep number of superstep
+ */
+ void postSuperstep(long superstep);
+}