You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2018/03/13 23:29:58 UTC

git commit: updated refs/heads/trunk to 8e6ec26

Repository: giraph
Updated Branches:
  refs/heads/trunk d86d0d56e -> 8e6ec2661


GIRAPH-1174

closes #62


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/8e6ec266
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/8e6ec266
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/8e6ec266

Branch: refs/heads/trunk
Commit: 8e6ec266190d59558f369f808c53cf9186887ce5
Parents: d86d0d5
Author: Maja Kabiljo <ma...@fb.com>
Authored: Fri Mar 9 14:03:41 2018 -0800
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Tue Mar 13 16:29:02 2018 -0700

----------------------------------------------------------------------
 .../apache/giraph/conf/GiraphConfiguration.java | 20 ++++++++++++
 .../org/apache/giraph/conf/GiraphConstants.java |  5 +++
 .../ImmutableClassesGiraphConfiguration.java    | 17 ++++++++++
 .../apache/giraph/graph/GraphTaskManager.java   |  5 +++
 .../org/apache/giraph/utils/GcObserver.java     | 33 ++++++++++++++++++++
 5 files changed, 80 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/8e6ec266/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index ffed2e0..e269de4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -51,6 +51,7 @@ import org.apache.giraph.master.MasterObserver;
 import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.ReusesObjectsPartition;
+import org.apache.giraph.utils.GcObserver;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.worker.WorkerObserver;
@@ -324,6 +325,16 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Add a GcObserver class (optional)
+   *
+   * @param gcObserverClass GcObserver class to add.
+   */
+  public final void addGcObserverClass(
+      Class<? extends GcObserver> gcObserverClass) {
+    GC_OBSERVER_CLASSES.add(this, gcObserverClass);
+  }
+
+  /**
    * Get job observer class
    *
    * @return GiraphJobObserver class set.
@@ -707,6 +718,15 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Get array of GcObserver classes set in configuration.
+   *
+   * @return array of GcObserver classes.
+   */
+  public Class<? extends GcObserver>[] getGcObserverClasses() {
+    return GC_OBSERVER_CLASSES.getArray(this);
+  }
+
+  /**
    * Whether to track, print, and aggregate metrics.
    *
    * @return true if metrics are enabled, false otherwise (default)

http://git-wip-us.apache.org/repos/asf/giraph/blob/8e6ec266/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 44b2a44..db13670 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -84,6 +84,7 @@ import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.partition.HashPartitionerFactory;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.SimplePartition;
+import org.apache.giraph.utils.GcObserver;
 import org.apache.giraph.worker.DefaultWorkerContext;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.worker.WorkerObserver;
@@ -229,6 +230,10 @@ public interface GiraphConstants {
   ClassConfOption<MapperObserver> MAPPER_OBSERVER_CLASSES =
       ClassConfOption.create("giraph.mapper.observers", null,
           MapperObserver.class, "Classes for Mapper Observer - optional");
+  /** Classes for GC Observer - optional */
+  ClassConfOption<GcObserver> GC_OBSERVER_CLASSES =
+      ClassConfOption.create("giraph.gc.observers", null,
+          GcObserver.class, "Classes for GC oObserver - optional");
   /** Message combiner class - optional */
   ClassConfOption<MessageCombiner> MESSAGE_COMBINER_CLASS =
       ClassConfOption.create("giraph.messageCombinerClass", null,

http://git-wip-us.apache.org/repos/asf/giraph/blob/8e6ec266/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 680368b..dfd24d0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -76,6 +76,7 @@ import org.apache.giraph.utils.ExtendedByteArrayDataInput;
 import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
 import org.apache.giraph.utils.ExtendedDataInput;
 import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.GcObserver;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
@@ -787,6 +788,22 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Create array of GcObservers.
+   *
+   * @param context Mapper context
+   * @return Instantiated array of GcObservers.
+   */
+  public GcObserver[] createGcObservers(
+      Mapper<?, ?, ?, ?>.Context context) {
+    Class<? extends GcObserver>[] klasses = getGcObserverClasses();
+    GcObserver[] objects = new GcObserver[klasses.length];
+    for (int i = 0; i < klasses.length; ++i) {
+      objects[i] = ReflectionUtils.newInstance(klasses[i], this, context);
+    }
+    return objects;
+  }
+
+  /**
    * Create job observer
    *
    * @return GiraphJobObserver set in configuration.

http://git-wip-us.apache.org/repos/asf/giraph/blob/8e6ec266/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index b0659bf..08b45a2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -58,6 +58,7 @@ import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.partition.PartitionStore;
 import org.apache.giraph.scripting.ScriptLoader;
 import org.apache.giraph.utils.CallableFactory;
+import org.apache.giraph.utils.GcObserver;
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.giraph.worker.BspServiceWorker;
@@ -650,6 +651,7 @@ end[PURE_YARN]*/
    * notifies an out-of-core engine (if any is used) about the GC.
    */
   private void installGCMonitoring() {
+    final GcObserver[] gcObservers = conf.createGcObservers(context);
     List<GarbageCollectorMXBean> mxBeans = ManagementFactory
         .getGarbageCollectorMXBeans();
     final OutOfCoreEngine oocEngine =
@@ -674,6 +676,9 @@ end[PURE_YARN]*/
             }
             gcTimeMetric.inc(info.getGcInfo().getDuration());
             GiraphMetrics.get().getGcTracker().gcOccurred(info.getGcInfo());
+            for (GcObserver gcObserver : gcObservers) {
+              gcObserver.gcOccurred(info);
+            }
             if (oocEngine != null) {
               oocEngine.gcCompleted(info);
             }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8e6ec266/giraph-core/src/main/java/org/apache/giraph/utils/GcObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/GcObserver.java b/giraph-core/src/main/java/org/apache/giraph/utils/GcObserver.java
new file mode 100644
index 0000000..3337e42
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/GcObserver.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import com.sun.management.GarbageCollectionNotificationInfo;
+
+/**
+ * Observer for when GCs occur
+ */
+public interface GcObserver {
+  /**
+   * Called to notify that GC occurred
+   *
+   * @param gcInfo GC info
+   */
+  void gcOccurred(GarbageCollectionNotificationInfo gcInfo);
+}