You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/08/27 14:53:59 UTC

hadoop git commit: MAPREDUCE-6861. Add metrics tags for ShuffleClientMetrics. (Contributed by Zoltan Siegl)

Repository: hadoop
Updated Branches:
  refs/heads/trunk a813fd021 -> 84973d104


MAPREDUCE-6861. Add metrics tags for ShuffleClientMetrics. (Contributed by Zoltan Siegl)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/84973d10
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/84973d10
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/84973d10

Branch: refs/heads/trunk
Commit: 84973d104917c0b8cbb187ee4f9868bbce967728
Parents: a813fd0
Author: Haibo Chen <ha...@apache.org>
Authored: Mon Aug 27 16:53:06 2018 +0200
Committer: Haibo Chen <ha...@apache.org>
Committed: Mon Aug 27 16:53:06 2018 +0200

----------------------------------------------------------------------
 .../hadoop/mapreduce/task/reduce/Shuffle.java   | 24 ++++---
 .../task/reduce/ShuffleClientMetrics.java       | 43 ++++++++++-
 .../task/reduce/TestShuffleClientMetrics.java   | 75 ++++++++++++++++++++
 3 files changed, 129 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/84973d10/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
index 3382bbf..1aad71d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
@@ -37,7 +37,8 @@ import org.apache.hadoop.util.Progress;
 @InterfaceAudience.LimitedPrivate({"MapReduce"})
 @InterfaceStability.Unstable
 @SuppressWarnings({"unchecked", "rawtypes"})
-public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionReporter {
+public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>,
+    ExceptionReporter {
   private static final int PROGRESS_FREQUENCY = 2000;
   private static final int MAX_EVENTS_TO_FETCH = 10000;
   private static final int MIN_EVENTS_TO_FETCH = 100;
@@ -51,7 +52,7 @@ public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionRepo
   private ShuffleClientMetrics metrics;
   private TaskUmbilicalProtocol umbilical;
   
-  private ShuffleSchedulerImpl<K,V> scheduler;
+  private ShuffleSchedulerImpl<K, V> scheduler;
   private MergeManager<K, V> merger;
   private Throwable throwable = null;
   private String throwingThreadName = null;
@@ -68,7 +69,8 @@ public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionRepo
     this.jobConf = context.getJobConf();
     this.umbilical = context.getUmbilical();
     this.reporter = context.getReporter();
-    this.metrics = ShuffleClientMetrics.create();
+    this.metrics = ShuffleClientMetrics.create(context.getReduceId(),
+        this.jobConf);
     this.copyPhase = context.getCopyPhase();
     this.taskStatus = context.getStatus();
     this.reduceTask = context.getReduceTask();
@@ -101,16 +103,16 @@ public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionRepo
     int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
 
     // Start the map-completion events fetcher thread
-    final EventFetcher<K,V> eventFetcher = 
-      new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
-          maxEventsToFetch);
+    final EventFetcher<K, V> eventFetcher =
+        new EventFetcher<K, V>(reduceId, umbilical, scheduler, this,
+            maxEventsToFetch);
     eventFetcher.start();
     
     // Start the map-output fetcher threads
     boolean isLocal = localMapFiles != null;
     final int numFetchers = isLocal ? 1 :
-      jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
-    Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
+        jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
+    Fetcher<K, V>[] fetchers = new Fetcher[numFetchers];
     if (isLocal) {
       fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
           merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
@@ -118,7 +120,7 @@ public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionRepo
       fetchers[0].start();
     } else {
       for (int i=0; i < numFetchers; ++i) {
-        fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, 
+        fetchers[i] = new Fetcher<K, V>(jobConf, reduceId, scheduler, merger,
                                        reporter, metrics, this, 
                                        reduceTask.getShuffleSecret());
         fetchers[i].start();
@@ -141,7 +143,7 @@ public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionRepo
     eventFetcher.shutDown();
     
     // Stop the map-output fetcher threads
-    for (Fetcher<K,V> fetcher : fetchers) {
+    for (Fetcher<K, V> fetcher : fetchers) {
       fetcher.shutDown();
     }
     
@@ -157,7 +159,7 @@ public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionRepo
     try {
       kvIter = merger.close();
     } catch (Throwable e) {
-      throw new ShuffleError("Error while doing final merge " , e);
+      throw new ShuffleError("Error while doing final merge ", e);
     }
 
     // Sanity check

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84973d10/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java
index d4e185d..d5e97aa 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java
@@ -17,24 +17,42 @@
  */
 package org.apache.hadoop.mapreduce.task.reduce;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import org.apache.hadoop.metrics2.MetricsInfo;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
+
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MutableCounterInt;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+
 
 import java.util.concurrent.ThreadLocalRandom;
 
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+
+/**
+ * Metric for Shuffle client.
+ */
+@SuppressWarnings("checkstyle:finalclass")
 @InterfaceAudience.LimitedPrivate({"MapReduce"})
 @InterfaceStability.Unstable
 @Metrics(name="ShuffleClientMetrics", context="mapred")
 public class ShuffleClientMetrics {
 
+  private static final MetricsInfo RECORD_INFO =
+      info("ShuffleClientMetrics", "Metrics for Shuffle client");
+
   @Metric
   private MutableCounterInt numFailedFetches;
   @Metric
@@ -44,14 +62,23 @@ public class ShuffleClientMetrics {
   @Metric
   private MutableGaugeInt numThreadsBusy;
 
+  private final MetricsRegistry metricsRegistry =
+      new MetricsRegistry(RECORD_INFO);
+
   private ShuffleClientMetrics() {
   }
 
-  public static ShuffleClientMetrics create() {
+  public static ShuffleClientMetrics create(
+      TaskAttemptID reduceId,
+      JobConf jobConf) {
     MetricsSystem ms = DefaultMetricsSystem.initialize("JobTracker");
+
+    ShuffleClientMetrics shuffleClientMetrics = new ShuffleClientMetrics();
+    shuffleClientMetrics.addTags(reduceId, jobConf);
+
     return ms.register("ShuffleClientMetrics-" +
         ThreadLocalRandom.current().nextInt(), null,
-        new ShuffleClientMetrics());
+            shuffleClientMetrics);
   }
 
   public void inputBytes(long bytes) {
@@ -69,4 +96,16 @@ public class ShuffleClientMetrics {
   public void threadFree() {
     numThreadsBusy.decr();
   }
+
+  private void addTags(TaskAttemptID reduceId, JobConf jobConf) {
+    metricsRegistry.tag("user", "", jobConf.getUser())
+        .tag("jobName", "", jobConf.getJobName())
+        .tag("jobId", "", reduceId.getJobID().toString())
+        .tag("taskId", "", reduceId.toString());
+  }
+
+  @VisibleForTesting
+  MetricsRegistry getMetricsRegistry() {
+    return metricsRegistry;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84973d10/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleClientMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleClientMetrics.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleClientMetrics.java
new file mode 100644
index 0000000..0baf52f
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleClientMetrics.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit test for {@link TestShuffleClientMetrics}.
+ */
+public class TestShuffleClientMetrics {
+
+  private static final String TEST_JOB_NAME = "Test job name";
+  private static final String TEST_JOB_ID = "Test job id";
+  private static final String TEST_TASK_ID = "Test task id";
+  private static final String TEST_USER_NAME = "Test user name";
+
+  @Test
+  public void testShuffleMetricsTags() {
+    // Set up
+    JobID jobID = mock(JobID.class);
+    when(jobID.toString()).thenReturn(TEST_JOB_ID);
+
+    TaskAttemptID reduceId = mock(TaskAttemptID.class);
+    when(reduceId.getJobID()).thenReturn(jobID);
+    when(reduceId.toString()).thenReturn(TEST_TASK_ID);
+
+    JobConf jobConf = mock(JobConf.class);
+    when(jobConf.getUser()).thenReturn(TEST_USER_NAME);
+    when(jobConf.getJobName()).thenReturn(TEST_JOB_NAME);
+
+    // Act
+    ShuffleClientMetrics shuffleClientMetrics =
+        ShuffleClientMetrics.create(reduceId, jobConf);
+
+    // Assert
+    MetricsTag userMetrics = shuffleClientMetrics.getMetricsRegistry()
+        .getTag("user");
+    assertEquals(TEST_USER_NAME, userMetrics.value());
+
+    MetricsTag jobNameMetrics = shuffleClientMetrics.getMetricsRegistry()
+        .getTag("jobName");
+    assertEquals(TEST_JOB_NAME, jobNameMetrics.value());
+
+    MetricsTag jobIdMetrics = shuffleClientMetrics.getMetricsRegistry()
+        .getTag("jobId");
+    assertEquals(TEST_JOB_ID, jobIdMetrics.value());
+
+    MetricsTag taskIdMetrics = shuffleClientMetrics.getMetricsRegistry()
+        .getTag("taskId");
+    assertEquals(TEST_TASK_ID, taskIdMetrics.value());
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org