You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/10 19:00:43 UTC

git commit: TEZ-918. Fix a bug which could cause shuffle to hang if there are intermittent fetch failures. (sseth)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 1b170a4ec -> 98b49f227


TEZ-918. Fix a bug which could cause shuffle to hang if there are intermittent fetch failures.
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/98b49f22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/98b49f22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/98b49f22

Branch: refs/heads/master
Commit: 98b49f22790e5116e1fd48fa96b3b01c2359ffaf
Parents: 1b170a4
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Mar 10 11:00:11 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Mar 10 11:00:11 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/common/TezJobConfig.java     |  2 +-
 .../library/common/shuffle/impl/Fetcher.java    | 84 ++++++++++--------
 .../library/common/shuffle/impl/Shuffle.java    |  4 +
 .../common/shuffle/impl/ShuffleScheduler.java   | 89 ++++++++++++--------
 4 files changed, 107 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98b49f22/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
index a3d6561..b34eaf2 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -194,7 +194,7 @@ public class TezJobConfig {
    */
   public static final String TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES = 
       "tez.runtime.shuffle.fetch.failures.limit";
-  public final static int DEFAULT_TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT = 10;
+  public final static int DEFAULT_TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT = 5;
 
   /**
    * 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98b49f22/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index 97b57f3..a81b83e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -76,6 +76,7 @@ class Fetcher extends Thread {
   private final Shuffle shuffle;
   private final int id;
   private static int nextId = 0;
+  private int currentPartition = -1;
   
   private final int connectionTimeout;
   private final int readTimeout;
@@ -87,8 +88,6 @@ class Fetcher extends Thread {
   private final SecretKey jobTokenSecret;
 
   private volatile boolean stopped = false;
-
-  private Configuration job;
   
   private final boolean ifileReadAhead;
   private final int ifileReadAheadLength;
@@ -102,7 +101,6 @@ class Fetcher extends Thread {
       ShuffleScheduler scheduler, MergeManager merger,
       ShuffleClientMetrics metrics,
       Shuffle shuffle, SecretKey jobTokenSecret, boolean ifileReadAhead, int ifileReadAheadLength, CompressionCodec codec, TezInputContext inputContext) throws IOException {
-    this.job = job;
     this.scheduler = scheduler;
     this.merger = merger;
     this.metrics = metrics;
@@ -228,6 +226,7 @@ class Fetcher extends Thread {
   protected void copyFromHost(MapHost host) throws IOException {
     // Get completed maps on 'host'
     List<InputAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
+    currentPartition = host.getPartitionId();
     
     // Sanity check to catch hosts with only 'OBSOLETE' maps, 
     // especially at the tail of large jobs
@@ -237,7 +236,7 @@ class Fetcher extends Thread {
     
     if(LOG.isDebugEnabled()) {
       LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
-        + srcAttempts);
+        + srcAttempts + ", partitionId: " + currentPartition);
     }
     
     // List of maps to be fetched yet
@@ -294,31 +293,28 @@ class Fetcher extends Thread {
       LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
     } catch (IOException ie) {
       ioErrs.increment(1);
-      LOG.warn("Failed to connect to " + host + " with " + remaining.size() + 
-               " map outputs", ie);
-
-      // If connect did not succeed, just mark all the maps as failed,
-      // indirectly penalizing the host
       if (!connectSucceeded) {
-        for(InputAttemptIdentifier left: remaining) {
-          scheduler.copyFailed(left, host, connectSucceeded);
-        }
-        remaining.clear();
+        LOG.warn("Failed to connect to " + host + " with " + remaining.size() + " inputs", ie);
+        connectionErrs.increment(1);
       } else {
-        // If we got a read error at this stage, it implies there was a problem
-        // with the first map, typically lost map. So, penalize only that map
-        // and add the rest
-        InputAttemptIdentifier firstMap = srcAttempts.get(0);
-        scheduler.copyFailed(firstMap, host, connectSucceeded);
-        remaining.remove(firstMap);
+        LOG.warn("Failed to verify reply after connecting to " + host + " with " + remaining.size()
+          + " inputs pending", ie);
       }
+
+      // At this point, either the connection failed, or the initial header verification failed.
+      // The error does not relate to any specific Input. Report all of them as failed.
       
-      // Add back all the remaining maps, WITHOUT marking them as failed
+      // This ends up indirectly penalizing the host (multiple failures reported on the single host)
+
       for(InputAttemptIdentifier left: remaining) {
-        // TODO Should the first one be skipped ?
-        scheduler.putBackKnownMapOutput(host, left);
+        // Need to be handling temporary glitches .. 
+        scheduler.copyFailed(left, host, !connectSucceeded);
       }
-      
+
+      // Add back all remaining maps - which at this point is ALL MAPS the
+      // Fetcher was started with. The Scheduler takes care of retries,
+      // reporting too many failures etc.
+      putBackRemainingMapOutputs(host);
       return;
     }
     
@@ -339,7 +335,6 @@ class Fetcher extends Thread {
         LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
         for(InputAttemptIdentifier left: failedTasks) {
           scheduler.copyFailed(left, host, true);
-          remaining.remove(left);
         }
       }
       
@@ -351,12 +346,27 @@ class Fetcher extends Thread {
             + remaining.size() + " left.");
       }
     } finally {
-      for (InputAttemptIdentifier left : remaining) {
-        scheduler.putBackKnownMapOutput(host, left);
-      }
+      putBackRemainingMapOutputs(host);
     }
   }
   
+  private void putBackRemainingMapOutputs(MapHost host) {
+    // Cycle through remaining MapOutputs
+    boolean isFirst = true;
+    InputAttemptIdentifier first = null;
+    for (InputAttemptIdentifier left : remaining) {
+      if (isFirst) {
+        first = left;
+        isFirst = false;
+        continue;
+      }
+      scheduler.putBackKnownMapOutput(host, left);
+    }
+    if (first != null) { // Empty remaining list.
+      scheduler.putBackKnownMapOutput(host, first);
+    }
+  }
+
   private static InputAttemptIdentifier[] EMPTY_ATTEMPT_ID_ARRAY = new InputAttemptIdentifier[0];
   
   private InputAttemptIdentifier[] copyMapOutput(MapHost host,
@@ -388,7 +398,7 @@ class Fetcher extends Thread {
         LOG.warn("Invalid map id ", e);
         // Don't know which one was bad, so consider this one bad and dont read
         // the remaining because we dont know where to start reading from. YARN-1773
-        return new InputAttemptIdentifier[] {srcAttemptId = getNextRemainingAttempt()};
+        return new InputAttemptIdentifier[] {getNextRemainingAttempt()};
       }
 
  
@@ -481,14 +491,16 @@ class Fetcher extends Thread {
                decompressedLength);
       return false;
     }
-    
-//    if (forReduce < reduceStartId || forReduce >= reduceStartId+reduceRange) {
-//      wrongReduceErrs.increment(1);
-//      LOG.warn(getName() + " data for the wrong reduce map: " +
-//               srcAttemptId + " len: " + compressedLength + " decomp len: " +
-//               decompressedLength + " for reduce " + forReduce);
-//      return false;
-//    }
+
+    // partitionId verification. Isn't availalbe here because it is encoded into
+    // URI
+    if (forReduce != currentPartition) {
+      wrongReduceErrs.increment(1);
+      LOG.warn(getName() + " data for the wrong partition map: " + srcAttemptId + " len: "
+          + compressedLength + " decomp len: " + decompressedLength + " for partition " + forReduce
+          + ", expected partition: " + currentPartition);
+      return false;
+    }
 
     // Sanity check
     if (!remaining.contains(srcAttemptId)) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98b49f22/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
index 583e1a1..2b940e4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
@@ -192,6 +192,8 @@ public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
     if (runShuffleFuture == null) {
       return false;
     }
+    // TODO This may return true, followed by the reader throwing the actual Exception.
+    // Fix as part of TEZ-919.
     return runShuffleFuture.isDone();
     //return scheduler.isDone() && merger.isMergeComplete();
   }
@@ -249,6 +251,8 @@ public class Shuffle implements ExceptionReporter, MemoryUpdateCallback {
       while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
         synchronized (this) {
           if (throwable != null) {
+            // This exception will show up when someone tries iterating through the fetched Input.
+            // As part of TEZ-919, report this early.
             throw new ShuffleError("error in shuffle in " + throwingThreadName,
                                    throwable);
           }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98b49f22/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index b33b838..1b34430 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -58,7 +58,7 @@ class ShuffleScheduler {
 
   private static final Log LOG = LogFactory.getLog(ShuffleScheduler.class);
   private static final int MAX_MAPS_AT_ONCE = 20;
-  private static final long INITIAL_PENALTY = 10000;
+  private static final long INITIAL_PENALTY = 2000l; // 2 seconds
   private static final float PENALTY_GROWTH_RATE = 1.3f;
   
   // TODO NEWTEZ May need to be a string if attempting to fetch from multiple inputs.
@@ -80,7 +80,6 @@ class ShuffleScheduler {
     new HashMap<String,IntWritable>();
   private final TezInputContext inputContext;
   private final Shuffle shuffle;
-  private final int abortFailureLimit;
   private final TezCounter shuffledMapsCounter;
   private final TezCounter reduceShuffleBytes;
   private final TezCounter reduceBytesDecompressed;
@@ -90,15 +89,15 @@ class ShuffleScheduler {
   
   private final long startTime;
   private long lastProgressTime;
-  
-  private int maxMapRuntime = 0;
-  private int maxFailedUniqueFetches = 5;
+
   private int maxFetchFailuresBeforeReporting;
-  
+  private boolean reportReadErrorImmediately = true; 
+  private int maxFailedUniqueFetches = 5;
+  private final int abortFailureLimit;
+  private int maxMapRuntime = 0;
+
   private long totalBytesShuffledTillNow = 0;
   private DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
-
-  private boolean reportReadErrorImmediately = true;
   
   public ShuffleScheduler(TezInputContext inputContext,
                           Configuration conf,
@@ -124,6 +123,7 @@ class ShuffleScheduler {
     this.bytesShuffledToMem = bytesShuffledToMem;
     this.startTime = System.currentTimeMillis();
     this.lastProgressTime = startTime;
+
     this.maxFailedUniqueFetches = Math.min(numberOfInputs,
         this.maxFailedUniqueFetches);
     referee.start();
@@ -135,6 +135,14 @@ class ShuffleScheduler {
         conf.getBoolean(
             TezJobConfig.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR, 
             TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR);
+    
+    LOG.info("ShuffleScheduler running for sourceVertex: "
+        + inputContext.getSourceVertexName() + " with configuration: "
+        + "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting
+        + ", reportReadErrorImmediately=" + reportReadErrorImmediately
+        + ", maxFailedUniqueFetches=" + maxFailedUniqueFetches
+        + ", abortFailureLimit=" + abortFailureLimit
+        + ", maxMapRuntime=" + maxMapRuntime);
   }
 
   public synchronized void copySucceeded(InputAttemptIdentifier srcAttemptIdentifier, 
@@ -200,6 +208,8 @@ class ShuffleScheduler {
       failureCounts.put(srcAttempt, new IntWritable(1));      
     }
     String hostname = host.getHostName();
+    // TODO TEZ-922 hostFailures isn't really used for anything. Factor it into error
+    // reporting / potential blacklisting of hosts.
     if (hostFailures.containsKey(hostname)) {
       IntWritable x = hostFailures.get(hostname);
       x.set(x.get() + 1);
@@ -207,6 +217,13 @@ class ShuffleScheduler {
       hostFailures.put(hostname, new IntWritable(1));
     }
     if (failures >= abortFailureLimit) {
+      // This task has seen too many fetch failures - report it as failed. The
+      // AM may retry it if max failures has not been reached.
+      
+      // Between the task and the AM - someone needs to determine who is at
+      // fault. If there's enough errors seen on the task, before the AM informs
+      // it about source failure, the task considers itself to have failed and
+      // allows the AM to re-schedule it.
       IOException ioe = new IOException(failures
             + " failures downloading "
             + TezRuntimeUtils.getTaskAttemptIdentifier(
@@ -215,7 +232,8 @@ class ShuffleScheduler {
       ioe.fillInStackTrace();
       shuffle.reportException(ioe);
     }
-    
+
+    failedShuffleCounter.increment(1);
     checkAndInformAM(failures, srcAttempt, readError);
 
     checkReducerHealth();
@@ -223,12 +241,10 @@ class ShuffleScheduler {
     long delay = (long) (INITIAL_PENALTY *
         Math.pow(PENALTY_GROWTH_RATE, failures));
     
-    penalties.add(new Penalty(host, delay));
-    
-    failedShuffleCounter.increment(1);
+    penalties.add(new Penalty(host, delay));    
   }
   
-  // Notify the JobTracker  
+  // Notify the AM  
   // after every read error, if 'reportReadErrorImmediately' is true or
   // after every 'maxFetchFailuresBeforeReporting' failures
   private void checkAndInformAM(
@@ -248,10 +264,9 @@ class ShuffleScheduler {
           .getInputIndex(), srcAttempt.getAttemptNumber()));
 
       inputContext.sendEvents(failedEvents);      
-      //status.addFailedDependency(mapId);
     }
   }
-    
+
   private void checkReducerHealth() {
     final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
     final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
@@ -293,8 +308,10 @@ class ShuffleScheduler {
         failureCounts.size() == (numInputs - doneMaps))
         && !reducerHealthy
         && (!reducerProgressedEnough || reducerStalled)) {
-      LOG.fatal("Shuffle failed with too many fetch failures " +
-      "and insufficient progress!");
+      LOG.fatal("Shuffle failed with too many fetch failures " + "and insufficient progress!"
+          + "failureCounts=" + failureCounts.size() + ", pendingInputs=" + (numInputs - doneMaps)
+          + ", reducerHealthy=" + reducerHealthy + ", reducerProgressedEnough="
+          + reducerProgressedEnough + ", reducerStalled=" + reducerStalled);
       String errorMsg = "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.";
       shuffle.reportException(new IOException(errorMsg));
     }
@@ -368,23 +385,32 @@ class ShuffleScheduler {
   
   public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) {
     List<InputAttemptIdentifier> origList = host.getAndClearKnownMaps();
+
     Map<Integer, InputAttemptIdentifier> dedupedList = new LinkedHashMap<Integer, InputAttemptIdentifier>();
     Iterator<InputAttemptIdentifier> listItr = origList.iterator();
     while (listItr.hasNext()) {
-      // we may want to try all versions of the input but with current retry 
+      // we may want to try all versions of the input but with current retry
       // behavior older ones are likely to be lost and should be ignored.
       // This may be removed after TEZ-914
       InputAttemptIdentifier id = listItr.next();
-      Integer inputNumber = new Integer(id.getInputIdentifier().getInputIndex());
-      InputAttemptIdentifier oldId = dedupedList.get(inputNumber);
-      if (oldId == null || oldId.getAttemptNumber() < id.getAttemptNumber()) {
-        dedupedList.put(inputNumber, id);
-        if (oldId != null) {
-          LOG.warn("Ignoring older source: " + oldId + 
-              " in favor of newer source: " + id);
+      if (inputShouldBeConsumed(id)) {
+        Integer inputNumber = new Integer(id.getInputIdentifier().getInputIndex());
+        InputAttemptIdentifier oldId = dedupedList.get(inputNumber);
+        if (oldId == null || oldId.getAttemptNumber() < id.getAttemptNumber()) {
+          dedupedList.put(inputNumber, id);
+          if (oldId != null) {
+            LOG.warn("Old Src for InputIndex: " + inputNumber + " with attemptNumber: "
+                + oldId.getAttemptNumber()
+                + " was not determined to be invalid. Ignoring it for now in favour of "
+                + id.getAttemptNumber());
+          }
         }
+      } else {
+        LOG.info("Ignoring finished or obsolete source: " + id);
       }
     }
+    
+    // Compute the final list, limited by NUM_FETCHERS_AT_ONCE
     List<InputAttemptIdentifier> result = new ArrayList<InputAttemptIdentifier>();
     int includedMaps = 0;
     int totalSize = dedupedList.size();
@@ -392,23 +418,16 @@ class ShuffleScheduler {
     // find the maps that we still need, up to the limit
     while (dedupedItr.hasNext()) {
       InputAttemptIdentifier id = dedupedItr.next().getValue();
-      if (inputShouldBeConsumed(id)) {
         result.add(id);
         if (++includedMaps >= MAX_MAPS_AT_ONCE) {
           break;
         }
-      } else {
-        LOG.info("Ignoring finished or obsolete source: " + id);
-      }
     }
+
     // put back the maps left after the limit
     while (dedupedItr.hasNext()) {
       InputAttemptIdentifier id = dedupedItr.next().getValue();
-      if (inputShouldBeConsumed(id)) {
-        host.addKnownMap(id);
-      } else {
-        LOG.info("Ignoring finished or obsolete source: " + id);
-      }
+      host.addKnownMap(id);
     }
     LOG.info("assigned " + includedMaps + " of " + totalSize + " to " +
              host + " to " + Thread.currentThread().getName());
@@ -425,7 +444,7 @@ class ShuffleScheduler {
     LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " + 
              (System.currentTimeMillis()-shuffleStart.get()) + "s");
   }
-    
+
   public synchronized void resetKnownMaps() {
     mapLocations.clear();
     obsoleteInputs.clear();