You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/03/28 01:54:06 UTC
tez git commit: TEZ-3642. Remove ShuffleClientMetrics (zhiyuany)
Repository: tez
Updated Branches:
refs/heads/master 2bdf58aa9 -> f7f60385c
TEZ-3642. Remove ShuffleClientMetrics (zhiyuany)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f7f60385
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f7f60385
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f7f60385
Branch: refs/heads/master
Commit: f7f60385cf5d8d66a1ee02e64d1e671bf1ad8771
Parents: 2bdf58a
Author: Zhiyuan Yang <zh...@apache.org>
Authored: Mon Mar 27 18:53:45 2017 -0700
Committer: Zhiyuan Yang <zh...@apache.org>
Committed: Mon Mar 27 18:53:45 2017 -0700
----------------------------------------------------------------------
.../orderedgrouped/FetcherOrderedGrouped.java | 10 ---
.../orderedgrouped/ShuffleClientMetrics.java | 92 --------------------
.../orderedgrouped/ShuffleScheduler.java | 6 +-
.../shuffle/orderedgrouped/TestFetcher.java | 27 ++----
4 files changed, 10 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/f7f60385/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 58ca1e2..5cad6fc 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -69,7 +69,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
private final TezCounter wrongReduceErrs;
private final FetchedInputAllocatorOrderedGrouped allocator;
private final ShuffleScheduler scheduler;
- private final ShuffleClientMetrics metrics;
private final ExceptionReporter exceptionReporter;
private final int id;
private final String logIdentifier;
@@ -107,7 +106,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
public FetcherOrderedGrouped(HttpConnectionParams httpConnectionParams,
ShuffleScheduler scheduler,
FetchedInputAllocatorOrderedGrouped allocator,
- ShuffleClientMetrics metrics,
ExceptionReporter exceptionReporter, JobTokenSecretManager jobTokenSecretMgr,
boolean ifileReadAhead, int ifileReadAheadLength,
CompressionCodec codec,
@@ -130,7 +128,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
boolean verifyDiskChecksum) {
this.scheduler = scheduler;
this.allocator = allocator;
- this.metrics = metrics;
this.exceptionReporter = exceptionReporter;
this.mapHost = mapHost;
this.currentPartition = this.mapHost.getPartitionId();
@@ -169,8 +166,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
@VisibleForTesting
protected void fetchNext() throws InterruptedException, IOException {
try {
- metrics.threadBusy();
-
if (localDiskFetchEnabled && mapHost.getHost().equals(localShuffleHost) && mapHost.getPort() == localShufflePort) {
setupLocalDiskFetch(mapHost);
} else {
@@ -180,7 +175,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
} finally {
cleanupCurrentConnection(false);
scheduler.freeHost(mapHost);
- metrics.threadFree();
}
}
@@ -524,7 +518,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
endTime - startTime, mapOutput, false);
// Note successful shuffle
remaining.remove(srcAttemptId.toString());
- metrics.successFetch();
return null;
} catch (IOException ioe) {
if (stopped) {
@@ -562,7 +555,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
// Inform the shuffle-scheduler
mapOutput.abort();
- metrics.failedFetch();
return new InputAttemptIdentifier[] {srcAttemptId};
}
}
@@ -685,13 +677,11 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
scheduler.copySucceeded(srcAttemptId, host, indexRecord.getPartLength(),
indexRecord.getRawLength(), (endTime - startTime), mapOutput, true);
iter.remove();
- metrics.successFetch();
} catch (IOException e) {
if (mapOutput != null) {
mapOutput.abort();
}
if (!stopped) {
- metrics.failedFetch();
ioErrs.increment(1);
scheduler.copyFailed(srcAttemptId, host, true, false, true);
LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " +
http://git-wip-us.apache.org/repos/asf/tez/blob/f7f60385/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java
deleted file mode 100644
index f297dad..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.Updater;
-import org.apache.tez.common.TezRuntimeFrameworkConfigs;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.Constants;
-import org.apache.tez.runtime.library.common.TezRuntimeUtils;
-
-class ShuffleClientMetrics implements Updater {
-
- private MetricsRecord shuffleMetrics = null;
- private int numFailedFetches = 0;
- private int numSuccessFetches = 0;
- private long numBytes = 0;
- private int numThreadsBusy = 0;
- private final int numCopiers;
-
- ShuffleClientMetrics(String dagName, String vertexName, int taskIndex, Configuration conf,
- String user) {
- this.numCopiers =
- conf.getInt(
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES,
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT);
-
- MetricsContext metricsContext = MetricsUtil.getContext(Constants.TEZ);
- this.shuffleMetrics =
- MetricsUtil.createRecord(metricsContext, "shuffleInput");
- this.shuffleMetrics.setTag("user", user);
- this.shuffleMetrics.setTag("dagName", dagName);
- this.shuffleMetrics.setTag("taskId", TezRuntimeUtils.getTaskIdentifier(vertexName, taskIndex));
- this.shuffleMetrics.setTag("sessionId",
- conf.get(
- TezRuntimeFrameworkConfigs.TEZ_RUNTIME_METRICS_SESSION_ID,
- TezRuntimeFrameworkConfigs.TEZ_RUNTIME_METRICS_SESSION_ID_DEFAULT));
- metricsContext.registerUpdater(this);
- }
- public synchronized void inputBytes(long numBytes) {
- this.numBytes += numBytes;
- }
- public synchronized void failedFetch() {
- ++numFailedFetches;
- }
- public synchronized void successFetch() {
- ++numSuccessFetches;
- }
- public synchronized void threadBusy() {
- ++numThreadsBusy;
- }
- public synchronized void threadFree() {
- --numThreadsBusy;
- }
- public void doUpdates(MetricsContext unused) {
- synchronized (this) {
- shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes);
- shuffleMetrics.incrMetric("shuffle_failed_fetches",
- numFailedFetches);
- shuffleMetrics.incrMetric("shuffle_success_fetches",
- numSuccessFetches);
- if (numCopiers != 0) {
- shuffleMetrics.setMetric("shuffle_fetchers_busy_percent",
- 100*((float)numThreadsBusy/numCopiers));
- } else {
- shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0);
- }
- numBytes = 0;
- numSuccessFetches = 0;
- numFailedFetches = 0;
- }
- shuffleMetrics.update();
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/f7f60385/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index cce486c..953c73e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -204,7 +204,6 @@ class ShuffleScheduler {
private final HttpConnectionParams httpConnectionParams;
private final FetchedInputAllocatorOrderedGrouped allocator;
- private final ShuffleClientMetrics shuffleMetrics;
private final ExceptionReporter exceptionReporter;
private final MergeManager mergeManager;
private final JobTokenSecretManager jobTokenSecretManager;
@@ -370,9 +369,6 @@ class ShuffleScheduler {
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
this.asyncHttp = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false);
this.httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
- this.shuffleMetrics = new ShuffleClientMetrics(inputContext.getDAGName(),
- inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
- this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
SecretKey jobTokenSecret = ShuffleUtils
.getJobTokenSecretFromTokenBytes(inputContext
.getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
@@ -1391,7 +1387,7 @@ class ShuffleScheduler {
@VisibleForTesting
FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) {
return new FetcherOrderedGrouped(httpConnectionParams, ShuffleScheduler.this, allocator,
- shuffleMetrics, exceptionReporter, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength,
+ exceptionReporter, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength,
codec, conf, localDiskFetchEnabled, localHostname, shufflePort, srcNameTrimmed, mapHost,
ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
connectionErrsCounter, wrongReduceErrsCounter, applicationId, dagId, asyncHttp, sslShuffle,
http://git-wip-us.apache.org/repos/asf/tez/blob/f7f60385/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index 310f1b2..a9b57a9 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -110,7 +110,6 @@ public class TestFetcher {
ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
MergeManager merger = mock(MergeManager.class);
- ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
Shuffle shuffle = mock(Shuffle.class);
InputContext inputContext = mock(InputContext.class);
@@ -124,7 +123,7 @@ public class TestFetcher {
doReturn(mapsForHost).when(scheduler).getMapsForHost(mapHost);
FetcherOrderedGrouped fetcher =
- new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+ new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0,
null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
@@ -142,7 +141,6 @@ public class TestFetcher {
Configuration conf = new TezConfiguration();
ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
MergeManager merger = mock(MergeManager.class);
- ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
Shuffle shuffle = mock(Shuffle.class);
InputContext inputContext = mock(InputContext.class);
@@ -153,7 +151,7 @@ public class TestFetcher {
final boolean DISABLE_LOCAL_FETCH = false;
MapHost mapHost = new MapHost(HOST, PORT, 0);
FetcherOrderedGrouped fetcher =
- new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+ new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0,
null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
@@ -171,7 +169,7 @@ public class TestFetcher {
// if hostname does not match use http
mapHost = new MapHost(HOST + "_OTHER", PORT, 0);
fetcher =
- new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+ new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0,
null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
@@ -187,7 +185,7 @@ public class TestFetcher {
// if port does not match use http
mapHost = new MapHost(HOST, PORT + 1, 0);
fetcher =
- new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+ new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0,
null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
@@ -202,7 +200,7 @@ public class TestFetcher {
//if local fetch is not enabled
mapHost = new MapHost(HOST, PORT, 0);
- fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+ fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0,
null, conf, DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
@@ -221,14 +219,13 @@ public class TestFetcher {
Configuration conf = new TezConfiguration();
ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
MergeManager merger = mock(MergeManager.class);
- ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
Shuffle shuffle = mock(Shuffle.class);
InputContext inputContext = mock(InputContext.class);
when(inputContext.getCounters()).thenReturn(new TezCounters());
when(inputContext.getSourceVertexName()).thenReturn("");
MapHost host = new MapHost(HOST, PORT, 1);
- FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+ FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0,
null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
false, false, true);
@@ -300,9 +297,6 @@ public class TestFetcher {
verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX), host, true, false, true);
verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX), host, true, false, true);
- verify(metrics, times(3)).successFetch();
- verify(metrics, times(2)).failedFetch();
-
verify(spyFetcher).putBackRemainingMapOutputs(host);
verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX));
verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX));
@@ -364,7 +358,6 @@ public class TestFetcher {
ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
MergeManager merger = mock(MergeManager.class);
- ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
Shuffle shuffle = mock(Shuffle.class);
InputContext inputContext = mock(InputContext.class);
when(inputContext.getCounters()).thenReturn(new TezCounters());
@@ -373,7 +366,7 @@ public class TestFetcher {
HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
final MapHost host = new MapHost(HOST, PORT, 1);
- FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+ FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0,
null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
false, false, true);
@@ -449,7 +442,6 @@ public class TestFetcher {
ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
MergeManager merger = mock(MergeManager.class);
- ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
Shuffle shuffle = mock(Shuffle.class);
TezCounters counters = new TezCounters();
@@ -463,7 +455,7 @@ public class TestFetcher {
HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
final MapHost host = new MapHost(HOST, PORT, 1);
FetcherOrderedGrouped mockFetcher =
- new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, metrics, shuffle, jobMgr,
+ new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, shuffle, jobMgr,
false, 0,
null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
@@ -528,11 +520,10 @@ public class TestFetcher {
Configuration conf = new TezConfiguration();
ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
MergeManager merger = mock(MergeManager.class);
- ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
Shuffle shuffle = mock(Shuffle.class);
MapHost mapHost = new MapHost(HOST, PORT, 0);
FetcherOrderedGrouped fetcher =
- new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
+ new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0,
null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,