You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/01/25 05:38:14 UTC

git commit: GIRAPH-491: Observer for job lifecycle (nitay)

Updated Branches:
  refs/heads/trunk 0e1dd3291 -> f8ee22a11


GIRAPH-491: Observer for job lifecycle (nitay)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f8ee22a1
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f8ee22a1
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f8ee22a1

Branch: refs/heads/trunk
Commit: f8ee22a119366d7c48cbbfd5f985d4811c0b3794
Parents: 0e1dd32
Author: Nitay Joffe <ni...@apache.org>
Authored: Thu Jan 24 16:32:28 2013 -0800
Committer: Nitay Joffe <ni...@apache.org>
Committed: Thu Jan 24 20:38:06 2013 -0800

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../apache/giraph/conf/GiraphConfiguration.java    |   31 +++++++--
 .../org/apache/giraph/conf/GiraphConstants.java    |    3 +
 .../conf/ImmutableClassesGiraphConfiguration.java  |   23 ++++++-
 .../apache/giraph/graph/DefaultJobObserver.java    |   52 +++++++++++++++
 .../java/org/apache/giraph/graph/GiraphJob.java    |   18 ++++--
 .../org/apache/giraph/graph/GiraphJobObserver.java |   39 +++++++++++
 .../org/apache/giraph/utils/ReflectionUtils.java   |    5 +-
 8 files changed, 156 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 2d429f9..27669ba 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-491: Observer for job lifecycle (nitay)
+
   GIRAPH-490: Constants for GiraphStats / GiraphTimers (nitay)
 
   GIRAPH-488: ArrayOutOfBoundsException in org.apache.giraph.worker.InputSplitPathOrganizer (ereisman)

http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index df7b80e..3e29a83 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -20,16 +20,18 @@ package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.graph.DefaultJobObserver;
+import org.apache.giraph.graph.GiraphJobObserver;
+import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
-import org.apache.giraph.master.MasterCompute;
-import org.apache.giraph.vertex.Vertex;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
-import org.apache.giraph.graph.VertexResolver;
-import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.master.MasterCompute;
+import org.apache.giraph.master.MasterObserver;
 import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.partition.Partition;
-import org.apache.giraph.master.MasterObserver;
+import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.worker.WorkerContext;
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -110,6 +112,25 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Get job observer class
+   *
+   * @return GiraphJobObserver class set.
+   */
+  public Class<? extends GiraphJobObserver> getJobObserverClass() {
+    return getClass(JOB_OBSERVER_CLASS, DefaultJobObserver.class,
+        GiraphJobObserver.class);
+  }
+
+  /**
+   * Set job observer class
+   *
+   * @param klass GiraphJobObserver class to set.
+   */
+  public void setJobObserverClass(Class<? extends GiraphJobObserver> klass) {
+    setClass(JOB_OBSERVER_CLASS, klass, GiraphJobObserver.class);
+  }
+
+  /**
    * Add a class to a property that is a list of classes. If the property does
    * not exist it will be created.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 8e75e5b..51415c2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -43,6 +43,9 @@ public interface GiraphConstants {
   String GRAPH_PARTITIONER_FACTORY_CLASS =
       "giraph.graphPartitionerFactoryClass";
 
+  /** Observer class to watch over job status - optional */
+  String JOB_OBSERVER_CLASS = "giraph.jobObserverClass";
+
   // At least one of the input format classes is required.
   /** VertexInputFormat class */
   String VERTEX_INPUT_FORMAT_CLASS = "giraph.vertexInputFormatClass";

http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index f2c8701..1f47039 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.graph.GiraphJobObserver;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.master.MasterCompute;
@@ -57,8 +58,8 @@ import org.apache.hadoop.util.Progressable;
  * @param <M> Message data
  */
 public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> extends
-    GiraphConfiguration {
+    V extends Writable, E extends Writable, M extends Writable>
+    extends GiraphConfiguration {
   /** Master graph partitioner - cached for fast access */
   protected final MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner;
 
@@ -89,6 +90,16 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Configure an object with this instance if the object is configurable.
+   * @param obj Object
+   */
+  public void configureIfPossible(Object obj) {
+    if (obj instanceof ImmutableClassesGiraphConfigurable) {
+      ((ImmutableClassesGiraphConfigurable) obj).setConf(this);
+    }
+  }
+
+  /**
    * Get the user's subclassed
    * {@link org.apache.giraph.partition.GraphPartitionerFactory}.
    *
@@ -406,6 +417,14 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Create job observer
+   * @return GiraphJobObserver set in configuration.
+   */
+  public GiraphJobObserver getJobObserver() {
+    return ReflectionUtils.newInstance(getJobObserverClass(), this);
+  }
+
+  /**
    * Get the user's subclassed edge value class.
    *
    * @return User's vertex edge value class

http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/giraph-core/src/main/java/org/apache/giraph/graph/DefaultJobObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultJobObserver.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultJobObserver.java
new file mode 100644
index 0000000..9f0418a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultJobObserver.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Default implementation of job observer that does nothing.
+ */
+public class DefaultJobObserver implements GiraphJobObserver,
+    ImmutableClassesGiraphConfigurable {
+  /** configuration object stored here */
+  private ImmutableClassesGiraphConfiguration conf;
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration configuration) {
+    this.conf = configuration;
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration getConf() {
+    return this.conf;
+  }
+
+  @Override
+  public void launchingJob(Job jobToSubmit) {
+    // do nothing
+  }
+
+  @Override
+  public void jobFinished(Job jobToSubmit, boolean passed) {
+    // do nothing
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJob.java
index 6e93a2f..18c0db2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJob.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJob.java
@@ -264,18 +264,24 @@ public class GiraphJob {
     }
 
     // Set the job properties, check them, and submit the job
-    ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration =
+    ImmutableClassesGiraphConfiguration conf =
         new ImmutableClassesGiraphConfiguration(giraphConfiguration);
-    checkConfiguration(immutableClassesGiraphConfiguration);
-    checkLocalJobRunnerConfiguration(immutableClassesGiraphConfiguration);
-    Job submittedJob = new Job(immutableClassesGiraphConfiguration, jobName);
+    checkConfiguration(conf);
+    checkLocalJobRunnerConfiguration(conf);
+    Job submittedJob = new Job(conf, jobName);
     if (submittedJob.getJar() == null) {
-      submittedJob.setJarByClass(GiraphJob.class);
+      submittedJob.setJarByClass(getClass());
     }
     submittedJob.setNumReduceTasks(0);
     submittedJob.setMapperClass(GraphMapper.class);
     submittedJob.setInputFormatClass(BspInputFormat.class);
     submittedJob.setOutputFormatClass(BspOutputFormat.class);
-    return submittedJob.waitForCompletion(verbose);
+
+    GiraphJobObserver jobObserver = conf.getJobObserver();
+    jobObserver.launchingJob(submittedJob);
+    boolean passed = submittedJob.waitForCompletion(verbose);
+    jobObserver.jobFinished(submittedJob, passed);
+
+    return passed;
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJobObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJobObserver.java b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJobObserver.java
new file mode 100644
index 0000000..7aec8c2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJobObserver.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * An observer over the job launch lifecycle.
+ */
+public interface GiraphJobObserver {
+  /**
+   * Callback for job about to start.
+   * @param jobToSubmit Job we're going to submit to hadoop.
+   */
+  void launchingJob(Job jobToSubmit);
+
+  /**
+   * Callback when job finishes.
+   * @param submittedJob Job that ran in hadoop.
+   * @param passed true if job succeeded.
+   */
+  void jobFinished(Job submittedJob, boolean passed);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
index d0a4a15..ae2c556 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.giraph.utils;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 
 import java.lang.reflect.Array;
@@ -184,9 +183,7 @@ public class ReflectionUtils {
           "newInstance: Illegal access " + theClass.getName(), e);
     }
     if (configuration != null) {
-      if (result instanceof ImmutableClassesGiraphConfigurable) {
-        ((ImmutableClassesGiraphConfigurable) result).setConf(configuration);
-      }
+      configuration.configureIfPossible(result);
     }
     return result;
   }