You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/20 00:30:43 UTC

git commit: TEZ-952. Port MAPREDUCE-5209 and MAPREDUCE-5251. (sseth)

Repository: incubator-tez
Updated Branches:
  refs/heads/master ece53e34a -> 5a24e979a


TEZ-952. Port MAPREDUCE-5209 and MAPREDUCE-5251. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/5a24e979
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/5a24e979
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/5a24e979

Branch: refs/heads/master
Commit: 5a24e979a8509da48b9e9dabc00e3e24944c6355
Parents: ece53e3
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Mar 19 16:30:11 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Mar 19 16:30:11 2014 -0700

----------------------------------------------------------------------
 .../tez/runtime/library/common/shuffle/impl/Fetcher.java    | 9 ++++++++-
 .../library/common/shuffle/impl/ShuffleScheduler.java       | 9 +++++++--
 .../apache/tez/runtime/library/shuffle/common/Fetcher.java  | 2 ++
 3 files changed, 17 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a24e979/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index a81b83e..db72e92 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -419,7 +419,14 @@ class Fetcher extends Thread {
       }
       
       // Get the location for the map output - either in-memory or on-disk
-      mapOutput = merger.reserve(srcAttemptId, decompressedLength, id);
+      try {
+        mapOutput = merger.reserve(srcAttemptId, decompressedLength, id);
+      } catch (IOException e) {
+        // Kill the reduce attempt
+        ioErrs.increment(1);
+        scheduler.reportLocalError(e);
+        return EMPTY_ATTEMPT_ID_ARRAY;
+      }
       
       // Check if we can shuffle *now* ...
       if (mapOutput.getType() == Type.WAIT) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a24e979/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index f1f12ef..3fbb6b1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -250,7 +250,12 @@ class ShuffleScheduler {
     
     penalties.add(new Penalty(host, delay));    
   }
-  
+
+  public void reportLocalError(IOException ioe) {
+    LOG.error("Shuffle failed : caused by local error", ioe);
+    shuffle.reportException(ioe);
+  }
+
   // Notify the AM  
   // after every read error, if 'reportReadErrorImmediately' is true or
   // after every 'maxFetchFailuresBeforeReporting' failures
@@ -449,7 +454,7 @@ class ShuffleScheduler {
       }
     }
     LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " + 
-             (System.currentTimeMillis()-shuffleStart.get()) + "s");
+             (System.currentTimeMillis()-shuffleStart.get()) + "ms");
   }
 
   public synchronized void resetKnownMaps() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a24e979/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
index 6d4a5d1..b2ac9dd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
@@ -247,6 +247,8 @@ public class Fetcher implements Callable<FetchResult> {
       }
 
       // Get the location for the map output - either in-memory or on-disk
+      
+      // TODO TEZ-957. handle IOException here when Broadcast has better error checking
       fetchedInput = inputManager.allocate(decompressedLength, compressedLength, srcAttemptId);
 
       // TODO NEWTEZ No concept of WAIT at the moment.