You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2016/08/23 18:53:44 UTC

git commit: updated refs/heads/trunk to 693a71c

Repository: giraph
Updated Branches:
  refs/heads/trunk 9d7bb4b52 -> 693a71c85


GIRAPH-1107: Allow observers to access job counters

Summary: From mapper/master/worker observer we might want to update some job counters for stats. For that we should allow observers to access job context.

Test Plan: Ran a job which accesses counters from WorkerObserver

Reviewers: sergey.edunov

Reviewed By: sergey.edunov

Differential Revision: https://reviews.facebook.net/D62391


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/693a71c8
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/693a71c8
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/693a71c8

Branch: refs/heads/trunk
Commit: 693a71c85a9e899b8f4054778e886fdcdd4470de
Parents: 9d7bb4b
Author: Maja Kabiljo <ma...@fb.com>
Authored: Tue Aug 23 11:52:26 2016 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Tue Aug 23 11:52:26 2016 -0700

----------------------------------------------------------------------
 .../org/apache/giraph/conf/ContextSettable.java | 34 ++++++++++++++++++++
 .../ImmutableClassesGiraphConfiguration.java    | 18 +++++++----
 .../apache/giraph/graph/GraphTaskManager.java   |  2 +-
 .../org/apache/giraph/graph/MapperObserver.java |  1 +
 .../apache/giraph/master/BspServiceMaster.java  |  2 +-
 .../apache/giraph/master/MasterObserver.java    |  1 +
 .../apache/giraph/utils/ReflectionUtils.java    | 24 ++++++++++++++
 .../apache/giraph/worker/BspServiceWorker.java  |  2 +-
 .../apache/giraph/worker/WorkerObserver.java    |  1 +
 9 files changed, 76 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/693a71c8/giraph-core/src/main/java/org/apache/giraph/conf/ContextSettable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ContextSettable.java b/giraph-core/src/main/java/org/apache/giraph/conf/ContextSettable.java
new file mode 100644
index 0000000..080a3b3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ContextSettable.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * Worker/Master/Mapper observer can implement this to get context set, to allow
+ * them to access job counters
+ */
+public interface ContextSettable {
+  /**
+   * Set context
+   *
+   * @param context Mapper context
+   */
+  void setContext(Mapper<?, ?, ?, ?>.Context context);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/693a71c8/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 1b79cba..91c8ed1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -734,13 +734,15 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   /**
    * Create array of MasterObservers.
    *
+   * @param context Mapper context
    * @return Instantiated array of MasterObservers.
    */
-  public MasterObserver[] createMasterObservers() {
+  public MasterObserver[] createMasterObservers(
+      Mapper<?, ?, ?, ?>.Context context) {
     Class<? extends MasterObserver>[] klasses = getMasterObserverClasses();
     MasterObserver[] objects = new MasterObserver[klasses.length];
     for (int i = 0; i < klasses.length; ++i) {
-      objects[i] = ReflectionUtils.newInstance(klasses[i], this);
+      objects[i] = ReflectionUtils.newInstance(klasses[i], this, context);
     }
     return objects;
   }
@@ -748,13 +750,15 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   /**
    * Create array of WorkerObservers.
    *
+   * @param context Mapper context
    * @return Instantiated array of WorkerObservers.
    */
-  public WorkerObserver[] createWorkerObservers() {
+  public WorkerObserver[] createWorkerObservers(
+      Mapper<?, ?, ?, ?>.Context context) {
     Class<? extends WorkerObserver>[] klasses = getWorkerObserverClasses();
     WorkerObserver[] objects = new WorkerObserver[klasses.length];
     for (int i = 0; i < klasses.length; ++i) {
-      objects[i] = ReflectionUtils.newInstance(klasses[i], this);
+      objects[i] = ReflectionUtils.newInstance(klasses[i], this, context);
     }
     return objects;
   }
@@ -762,13 +766,15 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   /**
    * Create array of MapperObservers.
    *
+   * @param context Mapper context
    * @return Instantiated array of MapperObservers.
    */
-  public MapperObserver[] createMapperObservers() {
+  public MapperObserver[] createMapperObservers(
+      Mapper<?, ?, ?, ?>.Context context) {
     Class<? extends MapperObserver>[] klasses = getMapperObserverClasses();
     MapperObserver[] objects = new MapperObserver[klasses.length];
     for (int i = 0; i < klasses.length; ++i) {
-      objects[i] = ReflectionUtils.newInstance(klasses[i], this);
+      objects[i] = ReflectionUtils.newInstance(klasses[i], this, context);
     }
     return objects;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/693a71c8/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 4d97e5f..339b5e9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -930,7 +930,7 @@ end[PURE_YARN]*/
    * Setup mapper observers
    */
   public void setupMapperObservers() {
-    mapperObservers = conf.createMapperObservers();
+    mapperObservers = conf.createMapperObservers(context);
     for (MapperObserver mapperObserver : mapperObservers) {
       mapperObserver.setup();
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/693a71c8/giraph-core/src/main/java/org/apache/giraph/graph/MapperObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MapperObserver.java b/giraph-core/src/main/java/org/apache/giraph/graph/MapperObserver.java
index cfbb421..8ebc233 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/MapperObserver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/MapperObserver.java
@@ -20,6 +20,7 @@ package org.apache.giraph.graph;
 
 /**
  * Mapper observer
+ * It can implement ContextSettable if it needs to access job counters.
  */
 public interface MapperObserver {
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/693a71c8/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 00da53c..971e266 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -223,7 +223,7 @@ public class BspServiceMaster<I extends WritableComparable,
     if (conf.isReactiveJmapHistogramDumpEnabled()) {
       conf.addMasterObserverClass(ReactiveJMapHistoDumper.class);
     }
-    observers = conf.createMasterObservers();
+    observers = conf.createMasterObservers(context);
 
     this.checkpointFrequency = conf.getCheckpointFrequency();
     this.checkpointStatus = CheckpointStatus.NONE;

http://git-wip-us.apache.org/repos/asf/giraph/blob/693a71c8/giraph-core/src/main/java/org/apache/giraph/master/MasterObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterObserver.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterObserver.java
index b12400a..3012308 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterObserver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterObserver.java
@@ -22,6 +22,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
 
 /**
  * Observer for Master.
+ * It can implement ContextSettable if it needs to access job counters.
  */
 public interface MasterObserver extends ImmutableClassesGiraphConfigurable {
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/693a71c8/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
index 028f9e0..8abfbbc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
@@ -20,7 +20,9 @@ package org.apache.giraph.utils;
 
 import java.lang.reflect.Modifier;
 
+import org.apache.giraph.conf.ContextSettable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.jodah.typetools.TypeResolver;
 
 /**
@@ -115,6 +117,28 @@ public class ReflectionUtils {
   }
 
   /**
+   * Instantiate classes that are ImmutableClassesGiraphConfigurable,
+   * and optionally set context on them if they are ContextSettable
+   *
+   * @param theClass Class to instantiate
+   * @param configuration Giraph configuration, may be null
+   * @param context Mapper context
+   * @param <T> Type to instantiate
+   * @return Newly instantiated object with configuration and context set if
+   * possible
+   */
+  public static <T> T newInstance(
+      Class<T> theClass,
+      ImmutableClassesGiraphConfiguration configuration,
+      Mapper<?, ?, ?, ?>.Context context) {
+    T result = newInstance(theClass, configuration);
+    if (result instanceof ContextSettable) {
+      ((ContextSettable) result).setContext(context);
+    }
+    return result;
+  }
+
+  /**
    * Verify that found type matches the expected type. If types don't match an
    * {@link IllegalStateException} will be thrown.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/693a71c8/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index b008f28..cdb9b7e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -241,7 +241,7 @@ public class BspServiceWorker<I extends WritableComparable,
     if (conf.isReactiveJmapHistogramDumpEnabled()) {
       conf.addWorkerObserverClass(ReactiveJMapHistoDumper.class);
     }
-    observers = conf.createWorkerObservers();
+    observers = conf.createWorkerObservers(context);
 
     WorkerProgress.get().setTaskId(getTaskPartition());
     workerProgressWriter = conf.trackJobProgressOnClient() ?

http://git-wip-us.apache.org/repos/asf/giraph/blob/693a71c8/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java
index b1b40db..88fcb80 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java
@@ -24,6 +24,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
  * Observer for worker. The user can subclass and register an observer with the
  * Giraph framework. The framework will execute methods of the observer at
  * designated moments of computation on each worker.
+ * It can implement ContextSettable if it needs to access job counters.
  */
 public interface WorkerObserver extends ImmutableClassesGiraphConfigurable {
   /**