You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/12/10 03:19:04 UTC

git commit: TEZ-667. BroadcastShuffleManager should not report errors after it has been closed. (sseth)

Updated Branches:
  refs/heads/master 7bcf397c1 -> ecf6adf8c


TEZ-667. BroadcastShuffleManager should not report errors after it has
been closed. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/ecf6adf8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/ecf6adf8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/ecf6adf8

Branch: refs/heads/master
Commit: ecf6adf8c4d809eeb2e770cfa1a04007be33fb2a
Parents: 7bcf397
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Dec 9 18:18:48 2013 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Dec 9 18:18:48 2013 -0800

----------------------------------------------------------------------
 .../input/BroadcastShuffleManager.java          | 60 +++++++++++++-------
 1 file changed, 41 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ecf6adf8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index a4acff6..d6af464 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -32,7 +32,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
@@ -59,10 +59,10 @@ import org.apache.tez.runtime.library.shuffle.common.FetchResult;
 import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
 import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
 import org.apache.tez.runtime.library.shuffle.common.Fetcher;
+import org.apache.tez.runtime.library.shuffle.common.Fetcher.FetcherBuilder;
 import org.apache.tez.runtime.library.shuffle.common.FetcherCallback;
 import org.apache.tez.runtime.library.shuffle.common.InputHost;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
-import org.apache.tez.runtime.library.shuffle.common.Fetcher.FetcherBuilder;
 
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
@@ -88,6 +88,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
 
   private final ExecutorService schedulerRawExecutor;
   private final ListeningExecutorService schedulerExecutor;
+  private final RunBroadcastShuffleCallable schedulerCallable = new RunBroadcastShuffleCallable();
   
   private final BlockingQueue<FetchedInput> completedInputs;
   private final Set<InputIdentifier> completedInputSet;
@@ -121,6 +122,8 @@ public class BroadcastShuffleManager implements FetcherCallback {
   
   private volatile Throwable shuffleError;
   
+  private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+  
   // TODO NEWTEZ Add counters.
   
   public BroadcastShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
@@ -206,9 +209,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
   }
   
   public void run() {
-    RunBroadcastShuffleCallable callable = new RunBroadcastShuffleCallable();
-    ListenableFuture<Void> runShuffleFuture = schedulerExecutor
-        .submit(callable);
+    ListenableFuture<Void> runShuffleFuture = schedulerExecutor.submit(schedulerCallable);
     Futures.addCallback(runShuffleFuture, new SchedulerFutureCallback());
     // Shutdown this executor once this task, and the callback complete.
     schedulerExecutor.shutdown();
@@ -218,7 +219,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
 
     @Override
     public Void call() throws Exception {
-      while (numCompletedInputs.get() < numInputs) {
+      while (!isShutdown.get() && numCompletedInputs.get() < numInputs) {
         lock.lock();
         try {
           if (numRunningFetchers.get() >= numFetchers || pendingHosts.size() == 0) {
@@ -245,7 +246,17 @@ public class BroadcastShuffleManager implements FetcherCallback {
             int maxFetchersToRun = numFetchers - numRunningFetchers.get();
             int count = 0;
             while (pendingHosts.peek() != null) {
-              InputHost inputHost = pendingHosts.take();
+              InputHost inputHost = null;
+              try {
+                inputHost = pendingHosts.take();
+              } catch (InterruptedException e) {
+                if (isShutdown.get()) {
+                  LOG.info("Interrupted and hasBeenShutdown, Breaking out of BroadcastScheduler Loop");
+                  break;
+                } else {
+                  throw e;
+                }
+              }
               if (LOG.isDebugEnabled()) {
                 LOG.debug("Processing pending host: " + inputHost.toDetailedString());
               }
@@ -253,6 +264,9 @@ public class BroadcastShuffleManager implements FetcherCallback {
                 LOG.info("Scheduling fetch for inputHost: " + inputHost.getHost());
                 Fetcher fetcher = constructFetcherForHost(inputHost);
                 numRunningFetchers.incrementAndGet();
+                if (isShutdown.get()) {
+                  LOG.info("hasBeenShutdown, Breaking out of BroadcastScheduler Loop");
+                }
                 ListenableFuture<FetchResult> future = fetcherExecutor
                     .submit(fetcher);
                 Futures.addCallback(future, fetchFutureCallback);
@@ -271,7 +285,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
           }
         }
       }
-      LOG.info("Shutting down FetchScheduler");
+      LOG.info("Shutting down FetchScheduler, Was Interrupted: " + Thread.currentThread().isInterrupted());
       // TODO NEWTEZ Maybe clean up inputs.
       if (!fetcherExecutor.isShutdown()) {
         fetcherExecutor.shutdownNow();
@@ -490,12 +504,12 @@ public class BroadcastShuffleManager implements FetcherCallback {
   /////////////////// End of Methods from FetcherCallbackHandler
 
   public void shutdown() throws InterruptedException {
+    isShutdown.set(true);
+    if (this.schedulerExecutor != null && !this.schedulerExecutor.isShutdown()) {
+      this.schedulerExecutor.shutdownNow(); // Interrupt all running fetchers
+    }
     if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
-      this.fetcherExecutor.shutdown();
-      this.fetcherExecutor.awaitTermination(2000l, TimeUnit.MILLISECONDS);
-      if (!this.fetcherExecutor.isShutdown()) {
-        this.fetcherExecutor.shutdownNow();
-      }
+      this.fetcherExecutor.shutdownNow(); // Interrupt all running fetchers
     }
   }
   
@@ -611,8 +625,12 @@ public class BroadcastShuffleManager implements FetcherCallback {
 
     @Override
     public void onFailure(Throwable t) {
-      LOG.error("Scheduler failed with error: ", t);
-      inputContext.fatalError(t, "Broadcast Scheduler Failed");
+      if (isShutdown.get()) {
+        LOG.info("Already shutdown. Ignoring error: " + t);
+      } else {
+        LOG.error("Scheduler failed with error: ", t);
+        inputContext.fatalError(t, "Broadcast Scheduler Failed");
+      }
     }
     
   }
@@ -645,10 +663,14 @@ public class BroadcastShuffleManager implements FetcherCallback {
 
     @Override
     public void onFailure(Throwable t) {
-      LOG.error("Fetcher failed with error: ", t);
-      shuffleError = t;
-      inputContext.fatalError(t, "Fetch failed");
-      doBookKeepingForFetcherComplete();
+      if (isShutdown.get()) {
+        LOG.info("Already shutdown. Ignoring error from fetcher: " + t);
+      } else {
+        LOG.error("Fetcher failed with error: ", t);
+        shuffleError = t;
+        inputContext.fatalError(t, "Fetch failed");
+        doBookKeepingForFetcherComplete();
+      }
     }
   }
 }