You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/01/25 05:38:14 UTC
git commit: GIRAPH-491: Observer for job lifecycle (nitay)
Updated Branches:
refs/heads/trunk 0e1dd3291 -> f8ee22a11
GIRAPH-491: Observer for job lifecycle (nitay)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f8ee22a1
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f8ee22a1
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f8ee22a1
Branch: refs/heads/trunk
Commit: f8ee22a119366d7c48cbbfd5f985d4811c0b3794
Parents: 0e1dd32
Author: Nitay Joffe <ni...@apache.org>
Authored: Thu Jan 24 16:32:28 2013 -0800
Committer: Nitay Joffe <ni...@apache.org>
Committed: Thu Jan 24 20:38:06 2013 -0800
----------------------------------------------------------------------
CHANGELOG | 2 +
.../apache/giraph/conf/GiraphConfiguration.java | 31 +++++++--
.../org/apache/giraph/conf/GiraphConstants.java | 3 +
.../conf/ImmutableClassesGiraphConfiguration.java | 23 ++++++-
.../apache/giraph/graph/DefaultJobObserver.java | 52 +++++++++++++++
.../java/org/apache/giraph/graph/GiraphJob.java | 18 ++++--
.../org/apache/giraph/graph/GiraphJobObserver.java | 39 +++++++++++
.../org/apache/giraph/utils/ReflectionUtils.java | 5 +-
8 files changed, 156 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 2d429f9..27669ba 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-491: Observer for job lifecycle (nitay)
+
GIRAPH-490: Constants for GiraphStats / GiraphTimers (nitay)
GIRAPH-488: ArrayOutOfBoundsException in org.apache.giraph.worker.InputSplitPathOrganizer (ereisman)
http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index df7b80e..3e29a83 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -20,16 +20,18 @@ package org.apache.giraph.conf;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.graph.DefaultJobObserver;
+import org.apache.giraph.graph.GiraphJobObserver;
+import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.io.EdgeInputFormat;
-import org.apache.giraph.master.MasterCompute;
-import org.apache.giraph.vertex.Vertex;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
-import org.apache.giraph.graph.VertexResolver;
-import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.master.MasterCompute;
+import org.apache.giraph.master.MasterObserver;
import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.partition.Partition;
-import org.apache.giraph.master.MasterObserver;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.worker.WorkerContext;
import org.apache.hadoop.conf.Configuration;
/**
@@ -110,6 +112,25 @@ public class GiraphConfiguration extends Configuration
}
/**
+ * Get job observer class
+ *
+ * @return GiraphJobObserver class set.
+ */
+ public Class<? extends GiraphJobObserver> getJobObserverClass() {
+ return getClass(JOB_OBSERVER_CLASS, DefaultJobObserver.class,
+ GiraphJobObserver.class);
+ }
+
+ /**
+ * Set job observer class
+ *
+ * @param klass GiraphJobObserver class to set.
+ */
+ public void setJobObserverClass(Class<? extends GiraphJobObserver> klass) {
+ setClass(JOB_OBSERVER_CLASS, klass, GiraphJobObserver.class);
+ }
+
+ /**
* Add a class to a property that is a list of classes. If the property does
* not exist it will be created.
*
http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 8e75e5b..51415c2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -43,6 +43,9 @@ public interface GiraphConstants {
String GRAPH_PARTITIONER_FACTORY_CLASS =
"giraph.graphPartitionerFactoryClass";
+ /** Observer class to watch over job status - optional */
+ String JOB_OBSERVER_CLASS = "giraph.jobObserverClass";
+
// At least one of the input format classes is required.
/** VertexInputFormat class */
String VERTEX_INPUT_FORMAT_CLASS = "giraph.vertexInputFormatClass";
http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index f2c8701..1f47039 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.giraph.conf;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.graph.GiraphJobObserver;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.master.MasterCompute;
@@ -57,8 +58,8 @@ import org.apache.hadoop.util.Progressable;
* @param <M> Message data
*/
public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> extends
- GiraphConfiguration {
+ V extends Writable, E extends Writable, M extends Writable>
+ extends GiraphConfiguration {
/** Master graph partitioner - cached for fast access */
protected final MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner;
@@ -89,6 +90,16 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
+ * Configure an object with this instance if the object is configurable.
+ * @param obj Object
+ */
+ public void configureIfPossible(Object obj) {
+ if (obj instanceof ImmutableClassesGiraphConfigurable) {
+ ((ImmutableClassesGiraphConfigurable) obj).setConf(this);
+ }
+ }
+
+ /**
* Get the user's subclassed
* {@link org.apache.giraph.partition.GraphPartitionerFactory}.
*
@@ -406,6 +417,14 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
+ * Create job observer
+ * @return GiraphJobObserver set in configuration.
+ */
+ public GiraphJobObserver getJobObserver() {
+ return ReflectionUtils.newInstance(getJobObserverClass(), this);
+ }
+
+ /**
* Get the user's subclassed edge value class.
*
* @return User's vertex edge value class
http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/giraph-core/src/main/java/org/apache/giraph/graph/DefaultJobObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultJobObserver.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultJobObserver.java
new file mode 100644
index 0000000..9f0418a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultJobObserver.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Default implementation of job observer that does nothing.
+ */
+public class DefaultJobObserver implements GiraphJobObserver,
+ ImmutableClassesGiraphConfigurable {
+ /** configuration object stored here */
+ private ImmutableClassesGiraphConfiguration conf;
+
+ @Override
+ public void setConf(ImmutableClassesGiraphConfiguration configuration) {
+ this.conf = configuration;
+ }
+
+ @Override
+ public ImmutableClassesGiraphConfiguration getConf() {
+ return this.conf;
+ }
+
+ @Override
+ public void launchingJob(Job jobToSubmit) {
+ // do nothing
+ }
+
+ @Override
+ public void jobFinished(Job jobToSubmit, boolean passed) {
+ // do nothing
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJob.java
index 6e93a2f..18c0db2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJob.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJob.java
@@ -264,18 +264,24 @@ public class GiraphJob {
}
// Set the job properties, check them, and submit the job
- ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration =
+ ImmutableClassesGiraphConfiguration conf =
new ImmutableClassesGiraphConfiguration(giraphConfiguration);
- checkConfiguration(immutableClassesGiraphConfiguration);
- checkLocalJobRunnerConfiguration(immutableClassesGiraphConfiguration);
- Job submittedJob = new Job(immutableClassesGiraphConfiguration, jobName);
+ checkConfiguration(conf);
+ checkLocalJobRunnerConfiguration(conf);
+ Job submittedJob = new Job(conf, jobName);
if (submittedJob.getJar() == null) {
- submittedJob.setJarByClass(GiraphJob.class);
+ submittedJob.setJarByClass(getClass());
}
submittedJob.setNumReduceTasks(0);
submittedJob.setMapperClass(GraphMapper.class);
submittedJob.setInputFormatClass(BspInputFormat.class);
submittedJob.setOutputFormatClass(BspOutputFormat.class);
- return submittedJob.waitForCompletion(verbose);
+
+ GiraphJobObserver jobObserver = conf.getJobObserver();
+ jobObserver.launchingJob(submittedJob);
+ boolean passed = submittedJob.waitForCompletion(verbose);
+ jobObserver.jobFinished(submittedJob, passed);
+
+ return passed;
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJobObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJobObserver.java b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJobObserver.java
new file mode 100644
index 0000000..7aec8c2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJobObserver.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * An observer over the job launch lifecycle.
+ */
+public interface GiraphJobObserver {
+ /**
+ * Callback for job about to start.
+ * @param jobToSubmit Job we're going to submit to hadoop.
+ */
+ void launchingJob(Job jobToSubmit);
+
+ /**
+ * Callback when job finishes.
+ * @param submittedJob Job that ran in hadoop.
+ * @param passed true if job succeeded.
+ */
+ void jobFinished(Job submittedJob, boolean passed);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
index d0a4a15..ae2c556 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
@@ -18,7 +18,6 @@
package org.apache.giraph.utils;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import java.lang.reflect.Array;
@@ -184,9 +183,7 @@ public class ReflectionUtils {
"newInstance: Illegal access " + theClass.getName(), e);
}
if (configuration != null) {
- if (result instanceof ImmutableClassesGiraphConfigurable) {
- ((ImmutableClassesGiraphConfigurable) result).setConf(configuration);
- }
+ configuration.configureIfPossible(result);
}
return result;
}