You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2018/10/10 21:21:33 UTC
tez git commit: TEZ-3990. The number of shuffle penalties for a
host/inputAttemptIdentifier should be capped (Kuhu Shukla via jeagles)
Repository: tez
Updated Branches:
refs/heads/master 2f1738888 -> a83b1e9d4
TEZ-3990. The number of shuffle penalties for a host/inputAttemptIdentifier should be capped (Kuhu Shukla via jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a83b1e9d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a83b1e9d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a83b1e9d
Branch: refs/heads/master
Commit: a83b1e9d467f5a74ed72da76b1de6e725cc33ebe
Parents: 2f17388
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Wed Oct 10 16:21:21 2018 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Wed Oct 10 16:21:21 2018 -0500
----------------------------------------------------------------------
.../library/api/TezRuntimeConfiguration.java | 10 ++++
.../orderedgrouped/ShuffleScheduler.java | 16 +++++--
.../orderedgrouped/TestShuffleScheduler.java | 50 ++++++++++++++++++++
3 files changed, 72 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/a83b1e9d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index 23f1f9b..85c53a5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -238,6 +238,15 @@ public class TezRuntimeConfiguration {
"shuffle.fetch.failures.limit";
public static final int TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT_DEFAULT = 5;
+ /**
+ * Specifies in milliseconds the maximum delay a penalized host can have before being retried,
+ * defaults to 10 minutes.
+ */
+ @ConfigurationProperty(type = "integer")
+ public static final String TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS = TEZ_RUNTIME_PREFIX +
+ "shuffle.host.penalty.time.limit";
+ public static final int TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS_DEFAULT = 600000;
+
@Private
@Unstable
@ConfigurationProperty(type = "integer")
@@ -609,6 +618,7 @@ public class TezRuntimeConfiguration {
tezRuntimeKeys.add(TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL);
+ tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS);
defaultConf.addResource("core-default.xml");
defaultConf.addResource("core-site.xml");
http://git-wip-us.apache.org/repos/asf/tez/blob/a83b1e9d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 981e224..d847932 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -60,7 +60,6 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.security.JobTokenSecretManager;
-import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -245,6 +244,7 @@ class ShuffleScheduler {
private final boolean compositeFetch;
private volatile Thread shuffleSchedulerThread = null;
+ private final int maxPenaltyTime;
private long totalBytesShuffledTillNow = 0;
private final DecimalFormat mbpsFormat = new DecimalFormat("0.00");
@@ -417,6 +417,8 @@ class ShuffleScheduler {
this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED);
this.compositeFetch = ShuffleUtils.isTezShuffleHandler(conf);
+ this.maxPenaltyTime = conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS_DEFAULT);
pipelinedShuffleInfoEventsMap = Maps.newConcurrentMap();
LOG.info("ShuffleScheduler running for sourceVertex: "
@@ -831,7 +833,8 @@ class ShuffleScheduler {
long delay = (long) (INITIAL_PENALTY *
Math.pow(PENALTY_GROWTH_RATE, failures));
- penalties.add(new Penalty(host, delay));
+ long penaltyDelay = Math.min(delay, maxPenaltyTime);
+ penalties.add(new Penalty(host, penaltyDelay));
}
private int getFailureCount(InputAttemptIdentifier srcAttempt) {
@@ -1149,7 +1152,12 @@ class ShuffleScheduler {
String path, int reduceId) {
return pathToIdentifierMap.get(new PathPartition(path, reduceId));
}
-
+
+ @VisibleForTesting
+ DelayQueue<Penalty> getPenalties() {
+ return penalties;
+ }
+
private synchronized boolean inputShouldBeConsumed(InputAttemptIdentifier id) {
boolean isInputFinished = false;
if (id instanceof CompositeInputAttemptIdentifier) {
@@ -1281,7 +1289,7 @@ class ShuffleScheduler {
/**
* A structure that records the penalty for a host.
*/
- private static class Penalty implements Delayed {
+ static class Penalty implements Delayed {
MapHost host;
private long endTime;
http://git-wip-us.apache.org/repos/asf/tez/blob/a83b1e9d/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index 381ad85..7a7b1ee 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
@@ -55,6 +56,7 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@@ -953,6 +955,54 @@ public class TestShuffleScheduler {
}
}
+ @Test (timeout = 120000)
+ public void testPenalties() throws Exception {
+ InputContext inputContext = createTezInputContext();
+ Configuration conf = new TezConfiguration();
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS, 20000);
+ int numInputs = 10;
+ Shuffle shuffle = mock(Shuffle.class);
+ MergeManager mergeManager = mock(MergeManager.class);
+
+ final ShuffleSchedulerForTest scheduler =
+ new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle, mergeManager,
+ mergeManager,
+ System.currentTimeMillis(), null, false, 0, "srcName");
+
+ ExecutorService executor = Executors.newFixedThreadPool(1);
+
+ Future<Void> executorFuture = executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ scheduler.start();
+ return null;
+ }
+ });
+
+ InputAttemptIdentifier[] identifiers = new InputAttemptIdentifier[numInputs];
+
+ for (int i = 0; i < numInputs; i++) {
+ CompositeInputAttemptIdentifier inputAttemptIdentifier =
+ new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
+ scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier);
+ identifiers[i] = inputAttemptIdentifier;
+ }
+
+ MapHost[] mapHosts = new MapHost[numInputs];
+ int count = 0;
+ for (MapHost mh : scheduler.mapLocations.values()) {
+ mapHosts[count++] = mh;
+ }
+
+ for (int i = 0; i < 10; i++) {
+ scheduler.copyFailed(identifiers[0], mapHosts[0], false, false, false);
+ }
+ ShuffleScheduler.Penalty[] penaltyArray = new ShuffleScheduler.Penalty[scheduler.getPenalties().size()];
+ scheduler.getPenalties().toArray(penaltyArray);
+ for (int i = 0; i < penaltyArray.length; i++) {
+ Assert.assertTrue(penaltyArray[i].getDelay(TimeUnit.MILLISECONDS) <= 20000);
+ }
+ }
private InputContext createTezInputContext() throws IOException {
ApplicationId applicationId = ApplicationId.newInstance(1, 1);