You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/03/28 19:48:28 UTC

[47/50] [abbrv] tez git commit: TEZ-3642. Remove ShuffleClientMetrics (zhiyuany)

TEZ-3642. Remove ShuffleClientMetrics (zhiyuany)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f7f60385
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f7f60385
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f7f60385

Branch: refs/heads/TEZ-1190
Commit: f7f60385cf5d8d66a1ee02e64d1e671bf1ad8771
Parents: 2bdf58a
Author: Zhiyuan Yang <zh...@apache.org>
Authored: Mon Mar 27 18:53:45 2017 -0700
Committer: Zhiyuan Yang <zh...@apache.org>
Committed: Mon Mar 27 18:53:45 2017 -0700

----------------------------------------------------------------------
 .../orderedgrouped/FetcherOrderedGrouped.java   | 10 ---
 .../orderedgrouped/ShuffleClientMetrics.java    | 92 --------------------
 .../orderedgrouped/ShuffleScheduler.java        |  6 +-
 .../shuffle/orderedgrouped/TestFetcher.java     | 27 ++----
 4 files changed, 10 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f7f60385/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 58ca1e2..5cad6fc 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -69,7 +69,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
   private final TezCounter wrongReduceErrs;
   private final FetchedInputAllocatorOrderedGrouped allocator;
   private final ShuffleScheduler scheduler;
-  private final ShuffleClientMetrics metrics;
   private final ExceptionReporter exceptionReporter;
   private final int id;
   private final String logIdentifier;
@@ -107,7 +106,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
   public FetcherOrderedGrouped(HttpConnectionParams httpConnectionParams,
                                ShuffleScheduler scheduler,
                                FetchedInputAllocatorOrderedGrouped allocator,
-                               ShuffleClientMetrics metrics,
                                ExceptionReporter exceptionReporter, JobTokenSecretManager jobTokenSecretMgr,
                                boolean ifileReadAhead, int ifileReadAheadLength,
                                CompressionCodec codec,
@@ -130,7 +128,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
                                boolean verifyDiskChecksum) {
     this.scheduler = scheduler;
     this.allocator = allocator;
-    this.metrics = metrics;
     this.exceptionReporter = exceptionReporter;
     this.mapHost = mapHost;
     this.currentPartition = this.mapHost.getPartitionId();
@@ -169,8 +166,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
   @VisibleForTesting
   protected void fetchNext() throws InterruptedException, IOException {
     try {
-      metrics.threadBusy();
-
       if (localDiskFetchEnabled && mapHost.getHost().equals(localShuffleHost) && mapHost.getPort() == localShufflePort) {
         setupLocalDiskFetch(mapHost);
       } else {
@@ -180,7 +175,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
     } finally {
       cleanupCurrentConnection(false);
       scheduler.freeHost(mapHost);
-      metrics.threadFree();
     }
   }
 
@@ -524,7 +518,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
                               endTime - startTime, mapOutput, false);
       // Note successful shuffle
       remaining.remove(srcAttemptId.toString());
-      metrics.successFetch();
       return null;
     } catch (IOException ioe) {
       if (stopped) {
@@ -562,7 +555,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
 
       // Inform the shuffle-scheduler
       mapOutput.abort();
-      metrics.failedFetch();
       return new InputAttemptIdentifier[] {srcAttemptId};
     }
   }
@@ -685,13 +677,11 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
           scheduler.copySucceeded(srcAttemptId, host, indexRecord.getPartLength(),
               indexRecord.getRawLength(), (endTime - startTime), mapOutput, true);
           iter.remove();
-          metrics.successFetch();
         } catch (IOException e) {
           if (mapOutput != null) {
             mapOutput.abort();
           }
           if (!stopped) {
-            metrics.failedFetch();
             ioErrs.increment(1);
             scheduler.copyFailed(srcAttemptId, host, true, false, true);
             LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " +

http://git-wip-us.apache.org/repos/asf/tez/blob/f7f60385/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java
deleted file mode 100644
index f297dad..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.Updater;
-import org.apache.tez.common.TezRuntimeFrameworkConfigs;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.Constants;
-import org.apache.tez.runtime.library.common.TezRuntimeUtils;
-
-class ShuffleClientMetrics implements Updater {
-
-  private MetricsRecord shuffleMetrics = null;
-  private int numFailedFetches = 0;
-  private int numSuccessFetches = 0;
-  private long numBytes = 0;
-  private int numThreadsBusy = 0;
-  private final int numCopiers;
-  
-  ShuffleClientMetrics(String dagName, String vertexName, int taskIndex, Configuration conf, 
-      String user) {
-    this.numCopiers = 
-        conf.getInt(
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, 
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
-
-    MetricsContext metricsContext = MetricsUtil.getContext(Constants.TEZ);
-    this.shuffleMetrics = 
-      MetricsUtil.createRecord(metricsContext, "shuffleInput");
-    this.shuffleMetrics.setTag("user", user);
-    this.shuffleMetrics.setTag("dagName", dagName);
-    this.shuffleMetrics.setTag("taskId", TezRuntimeUtils.getTaskIdentifier(vertexName, taskIndex));
-    this.shuffleMetrics.setTag("sessionId", 
-        conf.get(
-            TezRuntimeFrameworkConfigs.TEZ_RUNTIME_METRICS_SESSION_ID,
-            TezRuntimeFrameworkConfigs.TEZ_RUNTIME_METRICS_SESSION_ID_DEFAULT));
-    metricsContext.registerUpdater(this);
-  }
-  public synchronized void inputBytes(long numBytes) {
-    this.numBytes += numBytes;
-  }
-  public synchronized void failedFetch() {
-    ++numFailedFetches;
-  }
-  public synchronized void successFetch() {
-    ++numSuccessFetches;
-  }
-  public synchronized void threadBusy() {
-    ++numThreadsBusy;
-  }
-  public synchronized void threadFree() {
-    --numThreadsBusy;
-  }
-  public void doUpdates(MetricsContext unused) {
-    synchronized (this) {
-      shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes);
-      shuffleMetrics.incrMetric("shuffle_failed_fetches", 
-                                numFailedFetches);
-      shuffleMetrics.incrMetric("shuffle_success_fetches", 
-                                numSuccessFetches);
-      if (numCopiers != 0) {
-        shuffleMetrics.setMetric("shuffle_fetchers_busy_percent",
-            100*((float)numThreadsBusy/numCopiers));
-      } else {
-        shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0);
-      }
-      numBytes = 0;
-      numSuccessFetches = 0;
-      numFailedFetches = 0;
-    }
-    shuffleMetrics.update();
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/f7f60385/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index cce486c..953c73e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -204,7 +204,6 @@ class ShuffleScheduler {
 
   private final HttpConnectionParams httpConnectionParams;
   private final FetchedInputAllocatorOrderedGrouped allocator;
-  private final ShuffleClientMetrics shuffleMetrics;
   private final ExceptionReporter exceptionReporter;
   private final MergeManager mergeManager;
   private final JobTokenSecretManager jobTokenSecretManager;
@@ -370,9 +369,6 @@ class ShuffleScheduler {
         TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
     this.asyncHttp = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false);
     this.httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
-    this.shuffleMetrics = new ShuffleClientMetrics(inputContext.getDAGName(),
-        inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
-        this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
     SecretKey jobTokenSecret = ShuffleUtils
         .getJobTokenSecretFromTokenBytes(inputContext
             .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
@@ -1391,7 +1387,7 @@ class ShuffleScheduler {
   @VisibleForTesting
   FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) {
     return new FetcherOrderedGrouped(httpConnectionParams, ShuffleScheduler.this, allocator,
-        shuffleMetrics, exceptionReporter, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength,
+        exceptionReporter, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength,
         codec, conf, localDiskFetchEnabled, localHostname, shufflePort, srcNameTrimmed, mapHost,
         ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
         connectionErrsCounter, wrongReduceErrsCounter, applicationId, dagId, asyncHttp, sslShuffle,

http://git-wip-us.apache.org/repos/asf/tez/blob/f7f60385/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index 310f1b2..a9b57a9 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -110,7 +110,6 @@ public class TestFetcher {
     ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
     MergeManager merger = mock(MergeManager.class);
 
-    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
     Shuffle shuffle = mock(Shuffle.class);
 
     InputContext inputContext = mock(InputContext.class);
@@ -124,7 +123,7 @@ public class TestFetcher {
     doReturn(mapsForHost).when(scheduler).getMapsForHost(mapHost);
 
     FetcherOrderedGrouped fetcher =
-        new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+        new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0,
             null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
             wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
@@ -142,7 +141,6 @@ public class TestFetcher {
     Configuration conf = new TezConfiguration();
     ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
     MergeManager merger = mock(MergeManager.class);
-    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
     Shuffle shuffle = mock(Shuffle.class);
 
     InputContext inputContext = mock(InputContext.class);
@@ -153,7 +151,7 @@ public class TestFetcher {
     final boolean DISABLE_LOCAL_FETCH = false;
     MapHost mapHost = new MapHost(HOST, PORT, 0);
     FetcherOrderedGrouped fetcher =
-        new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+        new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0,
             null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
             wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
@@ -171,7 +169,7 @@ public class TestFetcher {
     // if hostname does not match use http
     mapHost = new MapHost(HOST + "_OTHER", PORT, 0);
     fetcher =
-        new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+        new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0,
             null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
             wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
@@ -187,7 +185,7 @@ public class TestFetcher {
     // if port does not match use http
     mapHost = new MapHost(HOST, PORT + 1, 0);
     fetcher =
-        new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+        new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0,
             null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
             wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
@@ -202,7 +200,7 @@ public class TestFetcher {
 
     //if local fetch is not enabled
     mapHost = new MapHost(HOST, PORT, 0);
-    fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+    fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0,
         null, conf, DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
         wrongLengthErrsCounter, badIdErrsCounter,
         wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
@@ -221,14 +219,13 @@ public class TestFetcher {
     Configuration conf = new TezConfiguration();
     ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
     MergeManager merger = mock(MergeManager.class);
-    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
     Shuffle shuffle = mock(Shuffle.class);
     InputContext inputContext = mock(InputContext.class);
     when(inputContext.getCounters()).thenReturn(new TezCounters());
     when(inputContext.getSourceVertexName()).thenReturn("");
 
     MapHost host = new MapHost(HOST, PORT, 1);
-    FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+    FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0,
         null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
         wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
         false, false, true);
@@ -300,9 +297,6 @@ public class TestFetcher {
     verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX), host, true, false, true);
     verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX), host, true, false, true);
 
-    verify(metrics, times(3)).successFetch();
-    verify(metrics, times(2)).failedFetch();
-
     verify(spyFetcher).putBackRemainingMapOutputs(host);
     verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX));
     verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX));
@@ -364,7 +358,6 @@ public class TestFetcher {
     ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
     MergeManager merger = mock(MergeManager.class);
 
-    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
     Shuffle shuffle = mock(Shuffle.class);
     InputContext inputContext = mock(InputContext.class);
     when(inputContext.getCounters()).thenReturn(new TezCounters());
@@ -373,7 +366,7 @@ public class TestFetcher {
 
     HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
     final MapHost host = new MapHost(HOST, PORT, 1);
-    FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+    FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0,
         null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
         wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
         false, false, true);
@@ -449,7 +442,6 @@ public class TestFetcher {
 
     ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
     MergeManager merger = mock(MergeManager.class);
-    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
     Shuffle shuffle = mock(Shuffle.class);
 
     TezCounters counters = new TezCounters();
@@ -463,7 +455,7 @@ public class TestFetcher {
     HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
     final MapHost host = new MapHost(HOST, PORT, 1);
     FetcherOrderedGrouped mockFetcher =
-        new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, metrics, shuffle, jobMgr,
+        new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, shuffle, jobMgr,
             false, 0,
             null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
@@ -528,11 +520,10 @@ public class TestFetcher {
     Configuration conf = new TezConfiguration();
     ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
     MergeManager merger = mock(MergeManager.class);
-    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
     Shuffle shuffle = mock(Shuffle.class);
     MapHost mapHost = new MapHost(HOST, PORT, 0);
     FetcherOrderedGrouped fetcher =
-        new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+        new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0,
             null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
             wrongLengthErrsCounter, badIdErrsCounter,
             wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,