You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/12/10 03:19:04 UTC
git commit: TEZ-667. BroadcastShuffleManager should not report errors
after it has been closed. (sseth)
Updated Branches:
refs/heads/master 7bcf397c1 -> ecf6adf8c
TEZ-667. BroadcastShuffleManager should not report errors after it has
been closed. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/ecf6adf8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/ecf6adf8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/ecf6adf8
Branch: refs/heads/master
Commit: ecf6adf8c4d809eeb2e770cfa1a04007be33fb2a
Parents: 7bcf397
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Dec 9 18:18:48 2013 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Dec 9 18:18:48 2013 -0800
----------------------------------------------------------------------
.../input/BroadcastShuffleManager.java | 60 +++++++++++++-------
1 file changed, 41 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ecf6adf8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index a4acff6..d6af464 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -32,7 +32,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -59,10 +59,10 @@ import org.apache.tez.runtime.library.shuffle.common.FetchResult;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
import org.apache.tez.runtime.library.shuffle.common.Fetcher;
+import org.apache.tez.runtime.library.shuffle.common.Fetcher.FetcherBuilder;
import org.apache.tez.runtime.library.shuffle.common.FetcherCallback;
import org.apache.tez.runtime.library.shuffle.common.InputHost;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
-import org.apache.tez.runtime.library.shuffle.common.Fetcher.FetcherBuilder;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
@@ -88,6 +88,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
private final ExecutorService schedulerRawExecutor;
private final ListeningExecutorService schedulerExecutor;
+ private final RunBroadcastShuffleCallable schedulerCallable = new RunBroadcastShuffleCallable();
private final BlockingQueue<FetchedInput> completedInputs;
private final Set<InputIdentifier> completedInputSet;
@@ -121,6 +122,8 @@ public class BroadcastShuffleManager implements FetcherCallback {
private volatile Throwable shuffleError;
+ private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
// TODO NEWTEZ Add counters.
public BroadcastShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
@@ -206,9 +209,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
}
public void run() {
- RunBroadcastShuffleCallable callable = new RunBroadcastShuffleCallable();
- ListenableFuture<Void> runShuffleFuture = schedulerExecutor
- .submit(callable);
+ ListenableFuture<Void> runShuffleFuture = schedulerExecutor.submit(schedulerCallable);
Futures.addCallback(runShuffleFuture, new SchedulerFutureCallback());
// Shutdown this executor once this task, and the callback complete.
schedulerExecutor.shutdown();
@@ -218,7 +219,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
@Override
public Void call() throws Exception {
- while (numCompletedInputs.get() < numInputs) {
+ while (!isShutdown.get() && numCompletedInputs.get() < numInputs) {
lock.lock();
try {
if (numRunningFetchers.get() >= numFetchers || pendingHosts.size() == 0) {
@@ -245,7 +246,17 @@ public class BroadcastShuffleManager implements FetcherCallback {
int maxFetchersToRun = numFetchers - numRunningFetchers.get();
int count = 0;
while (pendingHosts.peek() != null) {
- InputHost inputHost = pendingHosts.take();
+ InputHost inputHost = null;
+ try {
+ inputHost = pendingHosts.take();
+ } catch (InterruptedException e) {
+ if (isShutdown.get()) {
+ LOG.info("Interrupted and hasBeenShutdown, Breaking out of BroadcastScheduler Loop");
+ break;
+ } else {
+ throw e;
+ }
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Processing pending host: " + inputHost.toDetailedString());
}
@@ -253,6 +264,9 @@ public class BroadcastShuffleManager implements FetcherCallback {
LOG.info("Scheduling fetch for inputHost: " + inputHost.getHost());
Fetcher fetcher = constructFetcherForHost(inputHost);
numRunningFetchers.incrementAndGet();
+ if (isShutdown.get()) {
+ LOG.info("hasBeenShutdown, Breaking out of BroadcastScheduler Loop");
+ }
ListenableFuture<FetchResult> future = fetcherExecutor
.submit(fetcher);
Futures.addCallback(future, fetchFutureCallback);
@@ -271,7 +285,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
}
}
}
- LOG.info("Shutting down FetchScheduler");
+ LOG.info("Shutting down FetchScheduler, Was Interrupted: " + Thread.currentThread().isInterrupted());
// TODO NEWTEZ Maybe clean up inputs.
if (!fetcherExecutor.isShutdown()) {
fetcherExecutor.shutdownNow();
@@ -490,12 +504,12 @@ public class BroadcastShuffleManager implements FetcherCallback {
/////////////////// End of Methods from FetcherCallbackHandler
public void shutdown() throws InterruptedException {
+ isShutdown.set(true);
+ if (this.schedulerExecutor != null && !this.schedulerExecutor.isShutdown()) {
+ this.schedulerExecutor.shutdownNow(); // Interrupt all running fetchers
+ }
if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
- this.fetcherExecutor.shutdown();
- this.fetcherExecutor.awaitTermination(2000l, TimeUnit.MILLISECONDS);
- if (!this.fetcherExecutor.isShutdown()) {
- this.fetcherExecutor.shutdownNow();
- }
+ this.fetcherExecutor.shutdownNow(); // Interrupt all running fetchers
}
}
@@ -611,8 +625,12 @@ public class BroadcastShuffleManager implements FetcherCallback {
@Override
public void onFailure(Throwable t) {
- LOG.error("Scheduler failed with error: ", t);
- inputContext.fatalError(t, "Broadcast Scheduler Failed");
+ if (isShutdown.get()) {
+ LOG.info("Already shutdown. Ignoring error: " + t);
+ } else {
+ LOG.error("Scheduler failed with error: ", t);
+ inputContext.fatalError(t, "Broadcast Scheduler Failed");
+ }
}
}
@@ -645,10 +663,14 @@ public class BroadcastShuffleManager implements FetcherCallback {
@Override
public void onFailure(Throwable t) {
- LOG.error("Fetcher failed with error: ", t);
- shuffleError = t;
- inputContext.fatalError(t, "Fetch failed");
- doBookKeepingForFetcherComplete();
+ if (isShutdown.get()) {
+ LOG.info("Already shutdown. Ignoring error from fetcher: " + t);
+ } else {
+ LOG.error("Fetcher failed with error: ", t);
+ shuffleError = t;
+ inputContext.fatalError(t, "Fetch failed");
+ doBookKeepingForFetcherComplete();
+ }
}
}
}