You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/11/22 01:51:56 UTC

git commit: TEZ-598. ShuffleHandler should handle invalid headers correctly. (sseth)

Updated Branches:
  refs/heads/master 3d646ab2a -> 6ccebe747


TEZ-598. ShuffleHandler should handle invalid headers correctly. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/6ccebe74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/6ccebe74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/6ccebe74

Branch: refs/heads/master
Commit: 6ccebe747217adcef05a7137eb8844c11e8ae18c
Parents: 3d646ab
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Nov 21 16:51:39 2013 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Nov 21 16:51:39 2013 -0800

----------------------------------------------------------------------
 .../dag/app/TaskAttemptListenerImpTezDag.java   | 10 ++++----
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  4 ++++
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  3 ++-
 .../apache/tez/dag/app/rm/TaskScheduler.java    | 11 +++++++++
 .../library/common/shuffle/impl/Fetcher.java    | 25 ++++++++++++++++----
 .../common/shuffle/impl/ShuffleScheduler.java   |  8 +++----
 .../shuffle/common/DiskFetchedInput.java        |  2 ++
 .../runtime/library/shuffle/common/Fetcher.java |  3 ---
 8 files changed, 46 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ccebe74/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index f4f32fb..7741b71 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -187,15 +187,13 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
         LOG.debug("Container with id: " + containerId + " asked for a task");
       }
       if (!registeredContainers.containsKey(containerId)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Container " + containerId + " is no longer registered");
-        }
-        if(context.getAllContainers().get(containerId) == null)
+        if(context.getAllContainers().get(containerId) == null) {
           LOG.info("Container with id: " + containerId
               + " is invalid and will be killed");
-        else
+        } else {
           LOG.info("Container with id: " + containerId
-              + " is valid and will be killed");
+              + " is valid, but no longer registered, and will be killed");
+        }
         task = TASK_FOR_INVALID_JVM;
       } else {
         pingContainerHeartbeatHandler(containerId);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ccebe74/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 8a587c1..76c36c5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -307,6 +307,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     readLock = readWriteLock.readLock();
     writeLock = readWriteLock.writeLock();
     this.attempts = Collections.emptyMap();
+    // TODO Avoid reading this from configuration for each task.
     maxAttempts = this.conf.getInt(TezConfiguration.TEZ_AM_MAX_TASK_ATTEMPTS,
                               TezConfiguration.TEZ_AM_MAX_TASK_ATTEMPTS_DEFAULT);
     taskId = new TezTaskID(vertexId, taskIndex);
@@ -1048,6 +1049,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         // we don't need a new event if we already have a spare
         if (--task.numberUncompletedAttempts == 0
             && task.successfulAttempt == null) {
+          LOG.info("Scheduling new attempt for task: " + task.getTaskId()
+              + ", currentFailedAttempts: " + task.failedAttempts + ", maxAttempts: "
+              + task.maxAttempts);
           task.addAndScheduleAttempt();
         }
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ccebe74/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index cce2043..3eadded 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1727,7 +1727,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     public VertexState transition(VertexImpl vertex, VertexEvent event) {
       boolean forceTransitionToKillWait = false;
       vertex.completedTaskCount++;
-      LOG.info("Num completed Tasks: " + vertex.completedTaskCount);
+      LOG.info("Num completed Tasks for " + vertex.getVertexId() + " [" + vertex.getName() + "] : "
+          + vertex.completedTaskCount);
       VertexEventTaskCompleted taskEvent = (VertexEventTaskCompleted) event;
       Task task = vertex.tasks.get(taskEvent.getTaskID());
       if (taskEvent.getState() == TaskState.SUCCEEDED) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ccebe74/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index 6eedb15..d8d2de3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -446,6 +446,14 @@ public class TaskScheduler extends AbstractService
     }
     Map<CookieContainerRequest, Container> assignedContainers;
 
+    if (LOG.isDebugEnabled()) {
+      StringBuilder sb = new StringBuilder();
+      for (Container container: containers) {
+        sb.append(container.getId()).append(", ");
+      }
+      LOG.debug("Assigned New Containers: " + sb.toString());
+    }
+
     synchronized (this) {
       if (!shouldReuseContainers) {
         List<Container> modifiableContainerList = Lists.newLinkedList(containers);
@@ -1423,6 +1431,9 @@ public class TaskScheduler extends AbstractService
           if (heldContainer == null) {
             continue;
           }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Considering HeldContainer: " + heldContainer + " for assignment");
+          }
           long currentTs = System.currentTimeMillis();
           long nextScheduleTs = heldContainer.getNextScheduleTime();
           if (currentTs >= nextScheduleTs) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ccebe74/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index fd88973..e8038a4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -27,7 +27,7 @@ import java.net.URL;
 import java.net.URLConnection;
 import java.security.GeneralSecurityException;
 import java.util.Arrays;
-import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -93,6 +93,8 @@ class Fetcher extends Thread {
 
   private static boolean sslShuffle;
   private static SSLFactory sslFactory;
+  
+  private LinkedHashSet<InputAttemptIdentifier> remaining;
 
   public Fetcher(Configuration job, 
       ShuffleScheduler scheduler, MergeManager merger,
@@ -157,6 +159,7 @@ class Fetcher extends Thread {
   public void run() {
     try {
       while (!stopped && !Thread.currentThread().isInterrupted()) {
+        remaining = null; // Safety.
         MapHost host = null;
         try {
           // If merge is on, block
@@ -233,7 +236,7 @@ class Fetcher extends Thread {
     }
     
     // List of maps to be fetched yet
-    Set<InputAttemptIdentifier> remaining = new HashSet<InputAttemptIdentifier>(srcAttempts);
+    remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);
     
     // Construct the url and connect
     DataInputStream input;
@@ -319,7 +322,7 @@ class Fetcher extends Thread {
       // yet_to_be_fetched list and marking the failed tasks.
       InputAttemptIdentifier[] failedTasks = null;
       while (!remaining.isEmpty() && failedTasks == null) {
-        failedTasks = copyMapOutput(host, input, remaining);
+        failedTasks = copyMapOutput(host, input);
       }
       
       if(failedTasks != null && failedTasks.length > 0) {
@@ -346,8 +349,7 @@ class Fetcher extends Thread {
   private static InputAttemptIdentifier[] EMPTY_ATTEMPT_ID_ARRAY = new InputAttemptIdentifier[0];
   
   private InputAttemptIdentifier[] copyMapOutput(MapHost host,
-                                DataInputStream input,
-                                Set<InputAttemptIdentifier> remaining) {
+                                DataInputStream input) {
     MapOutput mapOutput = null;
     InputAttemptIdentifier srcAttemptId = null;
     long decompressedLength = -1;
@@ -376,6 +378,11 @@ class Fetcher extends Thread {
       // Do some basic sanity verification
       if (!verifySanity(compressedLength, decompressedLength, forReduce,
           remaining, srcAttemptId)) {
+        if (srcAttemptId == null) {
+          LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null");
+          srcAttemptId = getNextRemainingAttempt();
+        }
+        assert(srcAttemptId != null);
         return new InputAttemptIdentifier[] {srcAttemptId};
       }
       
@@ -474,6 +481,14 @@ class Fetcher extends Thread {
     
     return true;
   }
+  
+  private InputAttemptIdentifier getNextRemainingAttempt() {
+    if (remaining.size() > 0) {
+      return remaining.iterator().next();
+    } else {
+      return null;
+    }
+  }
 
   /**
    * Create the map-output-url. This will contain all the map ids

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ccebe74/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index 33da660..db4c794 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -189,15 +189,13 @@ class ShuffleScheduler {
       hostFailures.put(hostname, new IntWritable(1));
     }
     if (failures >= abortFailureLimit) {
-      try {
-        throw new IOException(failures
+      IOException ioe = new IOException(failures
             + " failures downloading "
             + TezRuntimeUtils.getTaskAttemptIdentifier(
                 inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getSrcTaskIndex(),
                 srcAttempt.getAttemptNumber()));
-      } catch (IOException ie) {
-        shuffle.reportException(ie);
-      }
+      ioe.fillInStackTrace();
+      shuffle.reportException(ioe);
     }
     
     checkAndInformJobTracker(failures, srcAttempt, readError);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ccebe74/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
index 9aeb65d..06b9314 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
@@ -51,6 +51,8 @@ public class DiskFetchedInput extends FetchedInput {
     this.localFS = FileSystem.getLocal(conf);
     this.outputPath = filenameAllocator.getInputFileForWrite(
         this.inputAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(), actualSize);
+    // Files are not clobbered due to the id being appended to the outputPath in the tmpPath,
+    // otherwise fetches for the same task but from different attempts would clobber each other.
     this.tmpOutputPath = outputPath.suffix(String.valueOf(id));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ccebe74/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
index 70059b0..1231dc3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
@@ -151,7 +151,6 @@ public class Fetcher implements Callable<FetchResult> {
       for (Iterator<InputAttemptIdentifier> leftIter = remaining.iterator(); leftIter
           .hasNext();) {
         fetcherCallback.fetchFailed(host, leftIter.next(), true);
-        leftIter.remove();
       }
       return new FetchResult(host, port, partition, remaining);
     }
@@ -170,7 +169,6 @@ public class Fetcher implements Callable<FetchResult> {
       LOG.warn("Fetch Failure from host while connecting: " + host
           + ", attempt: " + firstAttempt + " Informing ShuffleManager: ", e);
       fetcherCallback.fetchFailed(host, firstAttempt, false);
-      remaining.remove(firstAttempt);
       return new FetchResult(host, port, partition, remaining);
     }
 
@@ -190,7 +188,6 @@ public class Fetcher implements Callable<FetchResult> {
       LOG.warn("copyInputs failed for tasks " + Arrays.toString(failedInputs));
       for (InputAttemptIdentifier left : failedInputs) {
         fetcherCallback.fetchFailed(host, left, false);
-        remaining.remove(left);
       }
     }