You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2018/10/10 21:21:33 UTC

tez git commit: TEZ-3990. The number of shuffle penalties for a host/inputAttemptIdentifier should be capped (Kuhu Shukla via jeagles)

Repository: tez
Updated Branches:
  refs/heads/master 2f1738888 -> a83b1e9d4


TEZ-3990. The number of shuffle penalties for a host/inputAttemptIdentifier should be capped (Kuhu Shukla via jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a83b1e9d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a83b1e9d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a83b1e9d

Branch: refs/heads/master
Commit: a83b1e9d467f5a74ed72da76b1de6e725cc33ebe
Parents: 2f17388
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Wed Oct 10 16:21:21 2018 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Wed Oct 10 16:21:21 2018 -0500

----------------------------------------------------------------------
 .../library/api/TezRuntimeConfiguration.java    | 10 ++++
 .../orderedgrouped/ShuffleScheduler.java        | 16 +++++--
 .../orderedgrouped/TestShuffleScheduler.java    | 50 ++++++++++++++++++++
 3 files changed, 72 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a83b1e9d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index 23f1f9b..85c53a5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -238,6 +238,15 @@ public class TezRuntimeConfiguration {
       "shuffle.fetch.failures.limit";
   public static final int TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT_DEFAULT = 5;
 
+  /**
+   * Specifies in milliseconds the maximum delay a penalized host can have before being retried,
+   * defaults to 10 minutes.
+   */
+  @ConfigurationProperty(type = "integer")
+  public static final String TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS = TEZ_RUNTIME_PREFIX +
+      "shuffle.host.penalty.time.limit";
+  public static final int TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS_DEFAULT = 600000;
+
   @Private
   @Unstable
   @ConfigurationProperty(type = "integer")
@@ -609,6 +618,7 @@ public class TezRuntimeConfiguration {
     tezRuntimeKeys.add(TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
     tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT);
     tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL);
+    tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS);
 
     defaultConf.addResource("core-default.xml");
     defaultConf.addResource("core-site.xml");

http://git-wip-us.apache.org/repos/asf/tez/blob/a83b1e9d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 981e224..d847932 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -60,7 +60,6 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.http.HttpConnectionParams;
 import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.security.JobTokenSecretManager;
-import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -245,6 +244,7 @@ class ShuffleScheduler {
   private final boolean compositeFetch;
 
   private volatile Thread shuffleSchedulerThread = null;
+  private final int maxPenaltyTime;
 
   private long totalBytesShuffledTillNow = 0;
   private final DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
@@ -417,6 +417,8 @@ class ShuffleScheduler {
     this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
     this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED);
     this.compositeFetch = ShuffleUtils.isTezShuffleHandler(conf);
+    this.maxPenaltyTime = conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS_DEFAULT);
 
     pipelinedShuffleInfoEventsMap = Maps.newConcurrentMap();
     LOG.info("ShuffleScheduler running for sourceVertex: "
@@ -831,7 +833,8 @@ class ShuffleScheduler {
 
     long delay = (long) (INITIAL_PENALTY *
         Math.pow(PENALTY_GROWTH_RATE, failures));
-    penalties.add(new Penalty(host, delay));
+    long penaltyDelay = Math.min(delay, maxPenaltyTime);
+    penalties.add(new Penalty(host, penaltyDelay));
   }
 
   private int getFailureCount(InputAttemptIdentifier srcAttempt) {
@@ -1149,7 +1152,12 @@ class ShuffleScheduler {
       String path, int reduceId) {
     return pathToIdentifierMap.get(new PathPartition(path, reduceId));
   }
-  
+
+  @VisibleForTesting
+  DelayQueue<Penalty> getPenalties() {
+    return penalties;
+  }
+
   private synchronized boolean inputShouldBeConsumed(InputAttemptIdentifier id) {
     boolean isInputFinished = false;
     if (id instanceof CompositeInputAttemptIdentifier) {
@@ -1281,7 +1289,7 @@ class ShuffleScheduler {
   /**
    * A structure that records the penalty for a host.
    */
-  private static class Penalty implements Delayed {
+  static class Penalty implements Delayed {
     MapHost host;
     private long endTime;
     

http://git-wip-us.apache.org/repos/asf/tez/blob/a83b1e9d/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
index 381ad85..7a7b1ee 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
@@ -55,6 +56,7 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -953,6 +955,54 @@ public class TestShuffleScheduler {
     }
   }
 
+  @Test (timeout = 120000)
+  public void testPenalties() throws Exception {
+    InputContext inputContext = createTezInputContext();
+    Configuration conf = new TezConfiguration();
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS, 20000);
+    int numInputs = 10;
+    Shuffle shuffle = mock(Shuffle.class);
+    MergeManager mergeManager = mock(MergeManager.class);
+
+    final ShuffleSchedulerForTest scheduler =
+        new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle, mergeManager,
+            mergeManager,
+            System.currentTimeMillis(), null, false, 0, "srcName");
+
+    ExecutorService executor = Executors.newFixedThreadPool(1);
+
+    Future<Void> executorFuture = executor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        scheduler.start();
+        return null;
+      }
+    });
+
+    InputAttemptIdentifier[] identifiers = new InputAttemptIdentifier[numInputs];
+
+    for (int i = 0; i < numInputs; i++) {
+      CompositeInputAttemptIdentifier inputAttemptIdentifier =
+          new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1);
+      scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier);
+      identifiers[i] = inputAttemptIdentifier;
+    }
+
+    MapHost[] mapHosts = new MapHost[numInputs];
+    int count = 0;
+    for (MapHost mh : scheduler.mapLocations.values()) {
+      mapHosts[count++] = mh;
+    }
+
+    for (int i = 0; i < 10; i++) {
+      scheduler.copyFailed(identifiers[0], mapHosts[0], false, false, false);
+    }
+    ShuffleScheduler.Penalty[] penaltyArray = new ShuffleScheduler.Penalty[scheduler.getPenalties().size()];
+    scheduler.getPenalties().toArray(penaltyArray);
+    for (int i = 0; i < penaltyArray.length; i++) {
+      Assert.assertTrue(penaltyArray[i].getDelay(TimeUnit.MILLISECONDS) <= 20000);
+    }
+  }
 
   private InputContext createTezInputContext() throws IOException {
     ApplicationId applicationId = ApplicationId.newInstance(1, 1);