You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/04/16 20:48:14 UTC
git commit: TEZ-919. Fix shutdown handling for Shuffle. (sseth)
Repository: incubator-tez
Updated Branches:
refs/heads/master 36e940c6f -> 6b1871970
TEZ-919. Fix shutdown handling for Shuffle. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/6b187197
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/6b187197
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/6b187197
Branch: refs/heads/master
Commit: 6b1871970a4cd21ae5ad96f2e0c60d922066aa53
Parents: 36e940c
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Apr 16 11:44:41 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Apr 16 11:44:41 2014 -0700
----------------------------------------------------------------------
.../library/common/shuffle/impl/Fetcher.java | 102 ++++++--
.../library/common/shuffle/impl/Shuffle.java | 237 +++++++++++++++----
.../common/shuffle/impl/ShuffleScheduler.java | 1 +
.../exceptions/InputAlreadyClosedException.java | 38 +++
.../library/input/ShuffledMergedInput.java | 7 +-
.../runtime/library/shuffle/common/Fetcher.java | 134 +++++++----
.../shuffle/common/impl/ShuffleManager.java | 109 +++++----
7 files changed, 474 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6b187197/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index 32fad76..ac12f23 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -25,7 +25,6 @@ import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
-import java.net.URLConnection;
import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.LinkedHashSet;
@@ -80,6 +79,7 @@ class Fetcher extends Thread {
private final ShuffleClientMetrics metrics;
private final Shuffle shuffle;
private final int id;
+ private final String logIdentifier;
private static int nextId = 0;
private int currentPartition = -1;
@@ -104,10 +104,16 @@ class Fetcher extends Thread {
private boolean keepAlive = false;
+ volatile HttpURLConnection connection;
+ volatile DataInputStream input;
+
public Fetcher(Configuration job,
ShuffleScheduler scheduler, MergeManager merger,
ShuffleClientMetrics metrics,
- Shuffle shuffle, SecretKey jobTokenSecret, boolean ifileReadAhead, int ifileReadAheadLength, CompressionCodec codec, TezInputContext inputContext) throws IOException {
+ Shuffle shuffle, SecretKey jobTokenSecret,
+ boolean ifileReadAhead, int ifileReadAheadLength, CompressionCodec codec,
+ TezInputContext inputContext) throws IOException {
+ setDaemon(true);
this.scheduler = scheduler;
this.merger = merger;
this.metrics = metrics;
@@ -161,7 +167,8 @@ class Fetcher extends Thread {
this.bufferSize = job.getInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE,
TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE);
- setName("fetcher [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "] #" + id);
+ this.logIdentifier = "fetcher [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "] #" + id;
+ setName(logIdentifier);
setDaemon(true);
synchronized (Fetcher.class) {
@@ -195,6 +202,7 @@ class Fetcher extends Thread {
// Shuffle
copyFromHost(host);
} finally {
+ cleanupCurrentConnection();
if (host != null) {
scheduler.freeHost(host);
metrics.threadFree();
@@ -211,6 +219,7 @@ class Fetcher extends Thread {
public void shutDown() throws InterruptedException {
this.stopped = true;
interrupt();
+ cleanupCurrentConnection();
try {
join(5000);
} catch (InterruptedException ie) {
@@ -221,6 +230,31 @@ class Fetcher extends Thread {
}
}
+ private Object cleanupLock = new Object();
+ private void cleanupCurrentConnection() {
+ // Synchronizing on cleanupLock to ensure we don't run into a parallel close
+ // Can't synchronize on the main class itself since that would cause the
+ // shutdown request to block
+ synchronized (cleanupLock) {
+ try {
+ if (input != null) {
+ LOG.info("Closing input on " + logIdentifier);
+ input.close();
+ }
+ if (connection != null) {
+ LOG.info("Closing connection on " + logIdentifier);
+ connection.disconnect();
+ }
+ } catch (IOException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exception while shutting down fetcher " + logIdentifier, e);
+ } else {
+ LOG.info("Exception while shutting down fetcher " + logIdentifier + ": " + e.getMessage());
+ }
+ }
+ }
+ }
+
@VisibleForTesting
protected HttpURLConnection openConnection(URL url) throws IOException {
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
@@ -263,8 +297,6 @@ class Fetcher extends Thread {
remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);
// Construct the url and connect
- DataInputStream input;
- HttpURLConnection connection = null;
boolean connectSucceeded = false;
try {
@@ -285,7 +317,14 @@ class Fetcher extends Thread {
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
- connect(connection, connectionTimeout);
+ connect(connectionTimeout);
+
+ if (stopped) {
+ LOG.info("Detected fetcher has been shutdown after connection establishment. Returning");
+ cleanupCurrentConnection();
+ putBackRemainingMapOutputs(host);
+ return;
+ }
connectSucceeded = true;
input = new DataInputStream(new BufferedInputStream(connection.getInputStream(), bufferSize));
@@ -313,6 +352,12 @@ class Fetcher extends Thread {
SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
} catch (IOException ie) {
+ if (stopped) {
+ LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
+ cleanupCurrentConnection();
+ putBackRemainingMapOutputs(host);
+ return;
+ }
if (keepAlive && connection != null) {
//Read the response body in case of error. This helps in connection reuse.
//Refer: http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
@@ -367,9 +412,9 @@ class Fetcher extends Thread {
readErrorStream(connection.getErrorStream());
}
}
-
- IOUtils.cleanup(LOG, input);
-
+
+ cleanupCurrentConnection();
+
// Sanity check
if (failedTasks == null && !remaining.isEmpty()) {
throw new IOException("server didn't return all expected map outputs: "
@@ -503,6 +548,16 @@ class Fetcher extends Thread {
metrics.successFetch();
return null;
} catch (IOException ioe) {
+ if (stopped) {
+ LOG.info("Not reporting fetch failure for exception during data copy: ["
+ + ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
+ cleanupCurrentConnection();
+ if (mapOutput != null) {
+ mapOutput.abort(); // Release resources
+ }
+ // Don't need to put back - since that's handled by the invoker
+ return EMPTY_ATTEMPT_ID_ARRAY;
+ }
ioErrs.increment(1);
if (srcAttemptId == null || mapOutput == null) {
LOG.info("fetcher#" + id + " failed to read map header" +
@@ -610,8 +665,7 @@ class Fetcher extends Thread {
* only on the last failure. Instead of connecting with a timeout of
* X, we try connecting with a timeout of x < X but multiple times.
*/
- private void connect(URLConnection connection, int connectionTimeout)
- throws IOException {
+ private void connect(int connectionTimeout) throws IOException {
int unit = 0;
if (connectionTimeout < 0) {
throw new IOException("Invalid timeout "
@@ -626,6 +680,14 @@ class Fetcher extends Thread {
connection.connect();
break;
} catch (IOException ioe) {
+
+ // Don't attempt another connect if already stopped.
+ if (stopped) {
+ LOG.info("Fetcher already shutdown. Not attempting to connect again. Last exception was: ["
+ + ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
+ return;
+ }
+
// update the total remaining connect-timeout
connectionTimeout -= unit;
@@ -646,31 +708,31 @@ class Fetcher extends Thread {
}
private void shuffleToMemory(MapHost host, MapOutput mapOutput,
- InputStream input,
+ InputStream localInput,
int decompressedLength,
int compressedLength) throws IOException {
IFileInputStream checksumIn =
- new IFileInputStream(input, compressedLength, ifileReadAhead, ifileReadAheadLength);
+ new IFileInputStream(localInput, compressedLength, ifileReadAhead, ifileReadAheadLength);
- input = checksumIn;
+ localInput = checksumIn;
// Are map-outputs compressed?
if (codec != null) {
decompressor.reset();
- input = codec.createInputStream(input, decompressor);
+ localInput = codec.createInputStream(localInput, decompressor);
}
// Copy map-output into an in-memory buffer
byte[] shuffleData = mapOutput.getMemory();
try {
- IOUtils.readFully(input, shuffleData, 0, shuffleData.length);
+ IOUtils.readFully(localInput, shuffleData, 0, shuffleData.length);
metrics.inputBytes(shuffleData.length);
LOG.info("Read " + shuffleData.length + " bytes from map-output for " +
mapOutput.getAttemptIdentifier());
} catch (IOException ioe) {
// Close the streams
- IOUtils.cleanup(LOG, input);
+ IOUtils.cleanup(LOG, localInput);
// Re-throw
throw ioe;
@@ -679,7 +741,7 @@ class Fetcher extends Thread {
}
private void shuffleToDisk(MapHost host, MapOutput mapOutput,
- InputStream input,
+ InputStream localInput,
long compressedLength)
throws IOException {
// Copy data to local-disk
@@ -689,7 +751,7 @@ class Fetcher extends Thread {
final int BYTES_TO_READ = 64 * 1024;
byte[] buf = new byte[BYTES_TO_READ];
while (bytesLeft > 0) {
- int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
+ int n = localInput.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
if (n < 0) {
throw new IOException("read past end of stream reading " +
mapOutput.getAttemptIdentifier());
@@ -706,7 +768,7 @@ class Fetcher extends Thread {
output.close();
} catch (IOException ioe) {
// Close the streams
- IOUtils.cleanup(LOG, input, output);
+ IOUtils.cleanup(LOG, localInput, output);
// Re-throw
throw ioe;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6b187197/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
index a27fa31..b055f16 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
@@ -18,10 +18,13 @@
package org.apache.tez.runtime.library.common.shuffle.impl;
import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.FutureTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.crypto.SecretKey;
@@ -42,16 +45,23 @@ import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.combine.Combiner;
import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.runtime.library.exceptions.InputAlreadyClosedException;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Usage: Create instance, setInitialMemoryAllocated(long), run()
@@ -66,7 +76,6 @@ public class Shuffle implements ExceptionReporter {
private final Configuration conf;
private final TezInputContext inputContext;
- private final int numInputs;
private final ShuffleClientMetrics metrics;
@@ -78,22 +87,35 @@ public class Shuffle implements ExceptionReporter {
private final CompressionCodec codec;
private final boolean ifileReadAhead;
private final int ifileReadAheadLength;
+ private final int numFetchers;
private Throwable throwable = null;
private String throwingThreadName = null;
- private FutureTask<TezRawKeyValueIterator> runShuffleFuture;
+ private final RunShuffleCallable runShuffleCallable;
+ private volatile ListenableFuture<TezRawKeyValueIterator> runShuffleFuture;
+ private final ListeningExecutorService executor;
+
+ private final String srcNameTrimmed;
+
+ private final List<Fetcher> fetchers;
+
+ private AtomicBoolean isShutDown = new AtomicBoolean(false);
+ private AtomicBoolean fetchersClosed = new AtomicBoolean(false);
+ private AtomicBoolean schedulerClosed = new AtomicBoolean(false);
+ private AtomicBoolean mergerClosed = new AtomicBoolean(false);
public Shuffle(TezInputContext inputContext, Configuration conf, int numInputs,
long initialMemoryAvailable) throws IOException {
this.inputContext = inputContext;
this.conf = conf;
- this.numInputs = numInputs;
this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(),
inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
+ this.srcNameTrimmed = TezUtils.cleanVertexName(inputContext.getSourceVertexName());
+
this.jobTokenSecret = ShuffleUtils
.getJobTokenSecretFromTokenBytes(inputContext
.getServiceConsumerMetaData(TezConfiguration.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
@@ -149,7 +171,7 @@ public class Shuffle implements ExceptionReporter {
scheduler = new ShuffleScheduler(
this.inputContext,
this.conf,
- this.numInputs,
+ numInputs,
this,
shuffledInputsCounter,
reduceShuffleBytes,
@@ -174,24 +196,62 @@ public class Shuffle implements ExceptionReporter {
codec,
ifileReadAhead,
ifileReadAheadLength);
+
+ ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
+ .setDaemon(true).setNameFormat("ShuffleAndMergeRunner [" + srcNameTrimmed + "]").build());
+
+ int configuredNumFetchers =
+ conf.getInt(
+ TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
+ numFetchers = Math.min(configuredNumFetchers, numInputs);
+ LOG.info("Num fetchers being started: " + numFetchers);
+ fetchers = Lists.newArrayListWithCapacity(numFetchers);
+
+ executor = MoreExecutors.listeningDecorator(rawExecutor);
+ runShuffleCallable = new RunShuffleCallable();
}
public void handleEvents(List<Event> events) {
- eventHandler.handleEvents(events);
+ if (!isShutDown.get()) {
+ eventHandler.handleEvents(events);
+ } else {
+ LOG.info("Ignoring events since already shutdown. EventCount: " + events.size());
+ }
+
}
/**
* Indicates whether the Shuffle and Merge processing is complete.
* @return false if not complete, true if complete or if an error occurred.
+ * @throws InterruptedException
+ * @throws IOException
+ * @throws InputAlreadyClosedException
*/
- public boolean isInputReady() {
+ // ZZZ Deal with these methods.
+ public boolean isInputReady() throws IOException, InterruptedException {
+ if (isShutDown.get()) {
+ throw new InputAlreadyClosedException();
+ }
+ if (throwable != null) {
+ handleThrowable(throwable);
+ }
if (runShuffleFuture == null) {
return false;
}
- // TODO This may return true, followed by the reader throwing the actual Exception.
- // Fix as part of TEZ-919.
+ // Don't need to check merge status, since runShuffleFuture will only
+ // complete once merge is complete.
return runShuffleFuture.isDone();
- //return scheduler.isDone() && merger.isMergeComplete();
+ }
+
+ private void handleThrowable(Throwable t) throws IOException, InterruptedException {
+ if (t instanceof IOException) {
+ throw (IOException) t;
+ } else if (t instanceof InterruptedException) {
+ throw (InterruptedException) t;
+ } else {
+ throw new UndeclaredThrowableException(t);
+ }
}
/**
@@ -200,56 +260,59 @@ public class Shuffle implements ExceptionReporter {
* @throws IOException
* @throws InterruptedException
*/
+ // ZZZ Deal with these methods.
public TezRawKeyValueIterator waitForInput() throws IOException, InterruptedException {
Preconditions.checkState(runShuffleFuture != null,
"waitForInput can only be called after run");
- TezRawKeyValueIterator kvIter;
+ TezRawKeyValueIterator kvIter = null;
try {
kvIter = runShuffleFuture.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
- if (cause instanceof IOException) {
- throw (IOException) cause;
- } else if (cause instanceof InterruptedException) {
- throw (InterruptedException) cause;
- } else {
- throw new TezUncheckedException(
- "Unexpected exception type while running Shuffle and Merge", cause);
- }
+ handleThrowable(cause);
+ }
+ if (isShutDown.get()) {
+ throw new InputAlreadyClosedException();
+ }
+ if (throwable != null) {
+ handleThrowable(throwable);
}
return kvIter;
}
public void run() throws IOException {
merger.configureAndStart();
- RunShuffleCallable runShuffle = new RunShuffleCallable();
- runShuffleFuture = new FutureTask<TezRawKeyValueIterator>(runShuffle);
- new Thread(runShuffleFuture, "ShuffleMergeRunner ["
- + TezUtils.cleanVertexName(inputContext.getSourceVertexName() + "]")).start();
+ runShuffleFuture = executor.submit(runShuffleCallable);
+ Futures.addCallback(runShuffleFuture, new ShuffleRunnerFutureCallback());
+ executor.shutdown();
}
-
+
+ public void shutdown() {
+ if (!isShutDown.getAndSet(true)) {
+ // Interrupt so that the scheduler / merger sees this interrupt.
+ LOG.info("Shutting down Shuffle for source: " + srcNameTrimmed);
+ runShuffleFuture.cancel(true);
+ cleanupIgnoreErrors();
+ }
+ }
+
private class RunShuffleCallable implements Callable<TezRawKeyValueIterator> {
@Override
public TezRawKeyValueIterator call() throws IOException, InterruptedException {
- // TODO NEWTEZ Limit # fetchers to number of inputs
- final int numFetchers =
- conf.getInt(
- TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES,
- TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
- Fetcher[] fetchers = new Fetcher[numFetchers];
- for (int i = 0; i < numFetchers; ++i) {
- fetchers[i] = new Fetcher(conf, scheduler, merger, metrics,
- Shuffle.this, jobTokenSecret, ifileReadAhead, ifileReadAheadLength,
- codec, inputContext);
-
- fetchers[i].start();
+
+ synchronized (this) {
+ for (int i = 0; i < numFetchers; ++i) {
+ Fetcher fetcher = new Fetcher(conf, scheduler, merger, metrics, Shuffle.this, jobTokenSecret,
+ ifileReadAhead, ifileReadAheadLength, codec, inputContext);
+ fetchers.add(fetcher);
+ fetcher.start();
+ }
}
+
while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
synchronized (this) {
if (throwable != null) {
- inputContext.fatalError(throwable, "error in shuffle from thread :"
- + throwingThreadName);
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
@@ -257,13 +320,10 @@ public class Shuffle implements ExceptionReporter {
}
// Stop the map-output fetcher threads
- for (Fetcher fetcher : fetchers) {
- fetcher.shutDown();
- }
- fetchers = null;
+ cleanupFetchers(false);
// stop the scheduler
- scheduler.close();
+ cleanupShuffleScheduler(false);
// Finish the on-going merges...
@@ -271,15 +331,12 @@ public class Shuffle implements ExceptionReporter {
try {
kvIter = merger.close();
} catch (Throwable e) {
- inputContext.fatalError(e, "Error while doing final merge ");
throw new ShuffleError("Error while doing final merge " , e);
}
// Sanity check
synchronized (Shuffle.this) {
if (throwable != null) {
- inputContext.fatalError(throwable, "error in shuffle from thread :"
- + throwingThreadName);
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
@@ -291,6 +348,73 @@ public class Shuffle implements ExceptionReporter {
}
}
+ private synchronized void cleanupFetchers(boolean ignoreErrors) throws InterruptedException {
+ // Stop the fetcher threads
+ InterruptedException ie = null;
+ if (!fetchersClosed.getAndSet(true)) {
+ for (Fetcher fetcher : fetchers) {
+ try {
+ fetcher.shutDown();
+ } catch (InterruptedException e) {
+ if (ignoreErrors) {
+ LOG.info("Interrupted while shutting down fetchers. Ignoring.");
+ } else {
+ if (ie != null) {
+ ie = e;
+ } else {
+ LOG.warn("Ignoring exception while shutting down fetcher since a previous one was seen and will be thrown "
+ + e);
+ }
+ }
+ }
+ }
+ fetchers.clear();
+ // throw only the first exception while attempting to shutdown.
+ if (ie != null) {
+ throw ie;
+ }
+ }
+ }
+
+ private void cleanupShuffleScheduler(boolean ignoreErrors) throws InterruptedException {
+
+ if (!schedulerClosed.getAndSet(true)) {
+ try {
+ scheduler.close();
+ } catch (InterruptedException e) {
+ if (ignoreErrors) {
+ LOG.info("Interrupted while attempting to close the scheduler during cleanup. Ignoring");
+ } else {
+ throw e;
+ }
+ }
+ }
+ }
+
+ private void cleanupMerger(boolean ignoreErrors) throws Throwable {
+ if (!mergerClosed.getAndSet(true)) {
+ try {
+ merger.close();
+ } catch (Throwable e) {
+ if (ignoreErrors) {
+ LOG.info("Exception while trying to shutdown merger, Ignoring", e);
+ } else {
+ throw e;
+ }
+ }
+ }
+ }
+
+ private void cleanupIgnoreErrors() {
+ try {
+ cleanupFetchers(true);
+ cleanupShuffleScheduler(true);
+ cleanupMerger(true);
+ } catch (Throwable t) {
+ // Ignore
+ }
+ }
+
@Private
public synchronized void reportException(Throwable t) {
if (throwable == null) {
@@ -316,4 +440,23 @@ public class Shuffle implements ExceptionReporter {
public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) {
return MergeManager.getInitialMemoryRequirement(conf, maxAvailableTaskMemory);
}
+
+ private class ShuffleRunnerFutureCallback implements FutureCallback<TezRawKeyValueIterator> {
+ @Override
+ public void onSuccess(TezRawKeyValueIterator result) {
+ LOG.info("Shuffle Runner thread complete");
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // ZZZ Handle failures during shutdown.
+ if (isShutDown.get()) {
+ LOG.info("Already shutdown. Ignoring error: ", t);
+ } else {
+ LOG.error("ShuffleRunner failed with error", t);
+ inputContext.fatalError(t, "Shuffle Runner Failed");
+ cleanupIgnoreErrors();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6b187197/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index 45a50ad..41ce489 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -570,6 +570,7 @@ class ShuffleScheduler {
}
public void close() throws InterruptedException {
+ /// ZZZ need to interrupt setlf ?
referee.interrupt();
referee.join();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6b187197/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/exceptions/InputAlreadyClosedException.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/exceptions/InputAlreadyClosedException.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/exceptions/InputAlreadyClosedException.java
new file mode 100644
index 0000000..ee41066
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/exceptions/InputAlreadyClosedException.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.exceptions;
+
+
+public class InputAlreadyClosedException extends RuntimeException {
+
+ private static final long serialVersionUID = 5094990552896724803L;
+
+ public InputAlreadyClosedException() {
+ super("Input already closed");
+ }
+
+ public InputAlreadyClosedException(Throwable cause) {
+ super("Input already closed", cause);
+ }
+
+ public InputAlreadyClosedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6b187197/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index 0c01f92..7045b5c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -127,8 +127,10 @@ public class ShuffledMergedInput extends AbstractLogicalInput {
* @return true if the input is ready for consumption, or if an error occurred
* processing fetching the input. false if the shuffle and merge are
* still in progress
+ * @throws InterruptedException
+ * @throws IOException
*/
- public synchronized boolean isInputReady() {
+ public synchronized boolean isInputReady() throws IOException, InterruptedException {
Preconditions.checkState(isStarted.get(), "Must start input before invoking this method");
if (getNumPhysicalInputs() == 0) {
return true;
@@ -164,6 +166,9 @@ public class ShuffledMergedInput extends AbstractLogicalInput {
if (this.getNumPhysicalInputs() != 0 && rawIter != null) {
rawIter.close();
}
+ if (shuffle != null) {
+ shuffle.shutdown();
+ }
return Collections.emptyList();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6b187197/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
index b2ac9dd..9ecf590 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
@@ -23,8 +23,6 @@ import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
-import java.net.URLConnection;
-import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
@@ -32,15 +30,13 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.crypto.SecretKey;
-import javax.net.ssl.HttpsURLConnection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -76,6 +72,10 @@ public class Fetcher implements Callable<FetchResult> {
private final FetcherCallback fetcherCallback;
private final FetchedInputAllocator inputManager;
private final ApplicationId appId;
+
+ private final String logIdentifier;
+
+ private final AtomicBoolean isShutDown = new AtomicBoolean(false);
private static boolean sslShuffle = false;
private static SSLFactory sslFactory;
@@ -94,12 +94,14 @@ public class Fetcher implements Callable<FetchResult> {
private LinkedHashSet<InputAttemptIdentifier> remaining;
private URL url;
+ private volatile HttpURLConnection connection;
+ private volatile DataInputStream input;
private String encHash;
private String msgToEncode;
private Fetcher(FetcherCallback fetcherCallback,
FetchedInputAllocator inputManager, ApplicationId appId, SecretKey shuffleSecret,
- Configuration conf) {
+ String srcNameTrimmed) {
this.fetcherCallback = fetcherCallback;
this.inputManager = inputManager;
this.shuffleSecret = shuffleSecret;
@@ -107,6 +109,7 @@ public class Fetcher implements Callable<FetchResult> {
this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>();
this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
+ this.logIdentifier = "fetcher [" + srcNameTrimmed +"] " + fetcherIdentifier;
// TODO NEWTEZ Ideally, move this out from here into a static initializer block.
// Re-enable when ssl shuffle support is needed.
@@ -141,40 +144,62 @@ public class Fetcher implements Callable<FetchResult> {
remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);
- HttpURLConnection connection;
try {
- connection = connectToShuffleHandler(host, port, partition, srcAttempts);
+ connectToShuffleHandler(host, port, partition, srcAttempts);
} catch (IOException e) {
// ioErrs.increment(1);
// If connect did not succeed, just mark all the maps as failed,
// indirectly penalizing the host
- for (Iterator<InputAttemptIdentifier> leftIter = remaining.iterator(); leftIter
- .hasNext();) {
- fetcherCallback.fetchFailed(host, leftIter.next(), true);
+ if (isShutDown.get()) {
+ LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
+ } else {
+ for (Iterator<InputAttemptIdentifier> leftIter = remaining.iterator(); leftIter
+ .hasNext();) {
+ fetcherCallback.fetchFailed(host, leftIter.next(), true);
+ }
}
return new FetchResult(host, port, partition, remaining);
}
-
- DataInputStream input;
+ if (isShutDown.get()) {
+ // shutdown would have no effect if in the process of establishing the connection.
+ shutdownInternal();
+ LOG.info("Detected fetcher has been shutdown after connection establishment. Returning");
+ return new FetchResult(host, port, partition, remaining);
+ }
try {
input = new DataInputStream(connection.getInputStream());
- validateConnectionResponse(connection, url, msgToEncode, encHash);
+ validateConnectionResponse(msgToEncode, encHash);
} catch (IOException e) {
// ioErrs.increment(1);
// If we got a read error at this stage, it implies there was a problem
// with the first map, typically lost map. So, penalize only that map
// and add the rest
- InputAttemptIdentifier firstAttempt = srcAttempts.get(0);
- LOG.warn("Fetch Failure from host while connecting: " + host
- + ", attempt: " + firstAttempt + " Informing ShuffleManager: ", e);
- fetcherCallback.fetchFailed(host, firstAttempt, false);
- return new FetchResult(host, port, partition, remaining);
+ if (isShutDown.get()) {
+ LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
+ } else {
+ InputAttemptIdentifier firstAttempt = srcAttempts.get(0);
+ LOG.warn("Fetch Failure from host while connecting: " + host + ", attempt: " + firstAttempt
+ + " Informing ShuffleManager: ", e);
+ fetcherCallback.fetchFailed(host, firstAttempt, false);
+ return new FetchResult(host, port, partition, remaining);
+ }
}
// By this point, the connection is setup and the response has been
// validated.
+ // Handle any shutdown which may have been invoked.
+ if (isShutDown.get()) {
+ // shutdown would have no effect if in the process of establishing the connection.
+ shutdownInternal();
+ LOG.info("Detected fetcher has been shutdown after opening stream. Returning");
+ return new FetchResult(host, port, partition, remaining);
+ }
+ // After this point, closing the stream and connection, should cause a
+ // SocketException,
+ // which will be ignored since shutdown has been invoked.
+
// Loop through available map-outputs and fetch them
// On any error, faildTasks is not null and we exit
// after putting back the remaining maps to the
@@ -191,7 +216,7 @@ public class Fetcher implements Callable<FetchResult> {
}
}
- IOUtils.cleanup(LOG, input);
+ shutdown();
// Sanity check
if (failedInputs == null && !remaining.isEmpty()) {
@@ -203,6 +228,36 @@ public class Fetcher implements Callable<FetchResult> {
}
+ public void shutdown() {
+ if (!isShutDown.getAndSet(true)) {
+ shutdownInternal();
+ }
+ }
+
+ private void shutdownInternal() {
+ // Synchronizing on isShutDown to ensure we don't run into a parallel close
+ // Can't synchronize on the main class itself since that would cause the
+ // shutdown request to block
+ synchronized (isShutDown) {
+ try {
+ if (input != null) {
+ LOG.info("Closing input on " + logIdentifier);
+ input.close();
+ }
+ if (connection != null) {
+ LOG.info("Closing connection on " + logIdentifier);
+ connection.disconnect();
+ }
+ } catch (IOException e) {
+ LOG.info("Exception while shutting down fetcher on " + logIdentifier + " : "
+ + e.getMessage());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(e);
+ }
+ }
+ }
+ }
+
private InputAttemptIdentifier[] fetchInputs(DataInputStream input) {
FetchedInput fetchedInput = null;
InputAttemptIdentifier srcAttemptId = null;
@@ -285,6 +340,8 @@ public class Fetcher implements Callable<FetchResult> {
// metrics.successFetch();
return null;
} catch (IOException ioe) {
+ // ZZZ Add some shutdown code here
+ // ZZZ Make sure any assigned memory inputs are aborted
// ioErrs.increment(1);
if (srcAttemptId == null || fetchedInput == null) {
LOG.info("fetcher" + " failed to read map header" + srcAttemptId
@@ -360,11 +417,11 @@ public class Fetcher implements Callable<FetchResult> {
}
}
- private HttpURLConnection connectToShuffleHandler(String host, int port,
+ private void connectToShuffleHandler(String host, int port,
int partition, List<InputAttemptIdentifier> inputs) throws IOException {
try {
this.url = constructInputURL(host, port, partition, inputs);
- HttpURLConnection connection = openConnection(url);
+ this.connection = openConnection(url);
// generate hash of the url
this.msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
@@ -382,8 +439,7 @@ public class Fetcher implements Callable<FetchResult> {
connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
- connect(connection, connectionTimeout);
- return connection;
+ connect(connectionTimeout);
} catch (IOException e) {
LOG.warn("Failed to connect to " + host + " with " + srcAttempts.size()
+ " inputs", e);
@@ -391,8 +447,7 @@ public class Fetcher implements Callable<FetchResult> {
}
}
- private void validateConnectionResponse(HttpURLConnection connection,
- URL url, String msgToEncode, String encHash) throws IOException {
+ private void validateConnectionResponse(String msgToEncode, String encHash) throws IOException {
int rc = connection.getResponseCode();
if (rc != HttpURLConnection.HTTP_OK) {
throw new IOException("Got invalid response code " + rc + " from " + url
@@ -422,17 +477,8 @@ public class Fetcher implements Callable<FetchResult> {
}
protected HttpURLConnection openConnection(URL url) throws IOException {
- HttpURLConnection conn = (HttpURLConnection) url.openConnection();
- if (sslShuffle) {
- HttpsURLConnection httpsConn = (HttpsURLConnection) conn;
- try {
- httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());
- } catch (GeneralSecurityException ex) {
- throw new IOException(ex);
- }
- httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
- }
- return conn;
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+ return connection;
}
/**
@@ -440,8 +486,7 @@ public class Fetcher implements Callable<FetchResult> {
* only on the last failure. Instead of connecting with a timeout of X, we try
* connecting with a timeout of x < X but multiple times.
*/
- private void connect(URLConnection connection, int connectionTimeout)
- throws IOException {
+ private void connect(int connectionTimeout) throws IOException {
int unit = 0;
if (connectionTimeout < 0) {
throw new IOException("Invalid timeout " + "[timeout = "
@@ -456,6 +501,13 @@ public class Fetcher implements Callable<FetchResult> {
connection.connect();
break;
} catch (IOException ioe) {
+
+ // Check if already shutdown and abort subsequent connect attempts
+ if (isShutDown.get()) {
+ LOG.info("Fetcher already shutdown. Not attempting to connect again. Last exception was: ["
+ + ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
+ return;
+ }
// update the total remaining connect-timeout
connectionTimeout -= unit;
@@ -503,9 +555,9 @@ public class Fetcher implements Callable<FetchResult> {
public FetcherBuilder(FetcherCallback fetcherCallback,
FetchedInputAllocator inputManager, ApplicationId appId,
- SecretKey shuffleSecret, Configuration conf) {
+ SecretKey shuffleSecret, String srcNameTrimmed) {
this.fetcher = new Fetcher(fetcherCallback, inputManager, appId,
- shuffleSecret, conf);
+ shuffleSecret, srcNameTrimmed);
}
public FetcherBuilder setCompressionParameters(CompressionCodec codec) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6b187197/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
index 050c0c0..657feed 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
@@ -82,14 +82,12 @@ public class ShuffleManager implements FetcherCallback {
private static final Log LOG = LogFactory.getLog(ShuffleManager.class);
private final TezInputContext inputContext;
- private final Configuration conf;
private final int numInputs;
private final FetchedInputAllocator inputManager;
private final ListeningExecutorService fetcherExecutor;
- private final ExecutorService schedulerRawExecutor;
private final ListeningExecutorService schedulerExecutor;
private final RunShuffleCallable schedulerCallable = new RunShuffleCallable();
@@ -99,6 +97,7 @@ public class ShuffleManager implements FetcherCallback {
private final ConcurrentMap<String, InputHost> knownSrcHosts;
private final BlockingQueue<InputHost> pendingHosts;
private final Set<InputAttemptIdentifier> obsoletedInputs;
+ private Set<Fetcher> runningFetchers;
private final AtomicInteger numCompletedInputs = new AtomicInteger(0);
@@ -110,7 +109,6 @@ public class ShuffleManager implements FetcherCallback {
private final Condition wakeLoop = lock.newCondition();
private final int numFetchers;
- private final AtomicInteger numRunningFetchers = new AtomicInteger(0);
// Parameters required by Fetchers
private final SecretKey shuffleSecret;
@@ -122,8 +120,8 @@ public class ShuffleManager implements FetcherCallback {
private final boolean ifileReadAhead;
private final int ifileReadAheadLength;
- private final FetchFutureCallback fetchFutureCallback = new FetchFutureCallback();
-
+ private final String srcNameTrimmed;
+
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private final TezCounter shuffledInputsCounter;
@@ -141,7 +139,6 @@ public class ShuffleManager implements FetcherCallback {
int bufferSize, boolean ifileReadAheadEnabled, int ifileReadAheadLength,
CompressionCodec codec, FetchedInputAllocator inputAllocator) throws IOException {
this.inputContext = inputContext;
- this.conf = conf;
this.numInputs = numInputs;
this.shuffledInputsCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SHUFFLED_INPUTS);
@@ -156,13 +153,16 @@ public class ShuffleManager implements FetcherCallback {
this.ifileReadAheadLength = ifileReadAheadLength;
this.codec = codec;
this.inputManager = inputAllocator;
+
+ this.srcNameTrimmed = TezUtils.cleanVertexName(inputContext.getSourceVertexName());
completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
pendingHosts = new LinkedBlockingQueue<InputHost>();
obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputAttemptIdentifier, Boolean>());
-
+ runningFetchers = Collections.newSetFromMap(new ConcurrentHashMap<Fetcher, Boolean>());
+
int maxConfiguredFetchers =
conf.getInt(
TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES,
@@ -172,20 +172,13 @@ public class ShuffleManager implements FetcherCallback {
ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(
numFetchers,
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat(
- "Fetcher [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "] #%d")
- .build());
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Fetcher [" + srcNameTrimmed + "] #%d").build());
this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
- this.schedulerRawExecutor = Executors.newFixedThreadPool(
- 1,
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat(
- "ShuffleRunner [" + TezUtils.cleanVertexName(inputContext.getSourceVertexName()) + "]")
- .build());
+
+ ExecutorService schedulerRawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
+ .setDaemon(true).setNameFormat("ShuffleRunner [" + srcNameTrimmed + "]").build());
this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor);
this.startTime = System.currentTimeMillis();
@@ -224,7 +217,7 @@ public class ShuffleManager implements FetcherCallback {
while (!isShutdown.get() && numCompletedInputs.get() < numInputs) {
lock.lock();
try {
- if (numRunningFetchers.get() >= numFetchers || pendingHosts.size() == 0) {
+ if (runningFetchers.size() >= numFetchers || pendingHosts.isEmpty()) {
if (numCompletedInputs.get() < numInputs) {
wakeLoop.await();
}
@@ -242,12 +235,12 @@ public class ShuffleManager implements FetcherCallback {
if (LOG.isDebugEnabled()) {
LOG.debug("NumCompletedInputs: " + numCompletedInputs);
}
- if (numCompletedInputs.get() < numInputs) {
+ if (numCompletedInputs.get() < numInputs && !isShutdown.get()) {
lock.lock();
try {
- int maxFetchersToRun = numFetchers - numRunningFetchers.get();
+ int maxFetchersToRun = numFetchers - runningFetchers.size();
int count = 0;
- while (pendingHosts.peek() != null) {
+ while (pendingHosts.peek() != null && !isShutdown.get()) {
InputHost inputHost = null;
try {
inputHost = pendingHosts.take();
@@ -262,16 +255,16 @@ public class ShuffleManager implements FetcherCallback {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing pending host: " + inputHost.toDetailedString());
}
- if (inputHost.getNumPendingInputs() > 0) {
+ if (inputHost.getNumPendingInputs() > 0 && !isShutdown.get()) {
LOG.info("Scheduling fetch for inputHost: " + inputHost.getIdentifier());
Fetcher fetcher = constructFetcherForHost(inputHost);
- numRunningFetchers.incrementAndGet();
+ runningFetchers.add(fetcher);
if (isShutdown.get()) {
LOG.info("hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
}
ListenableFuture<FetchResult> future = fetcherExecutor
.submit(fetcher);
- Futures.addCallback(future, fetchFutureCallback);
+ Futures.addCallback(future, new FetchFutureCallback(fetcher));
if (++count >= maxFetchersToRun) {
break;
}
@@ -297,9 +290,8 @@ public class ShuffleManager implements FetcherCallback {
}
private Fetcher constructFetcherForHost(InputHost inputHost) {
- FetcherBuilder fetcherBuilder = new FetcherBuilder(
- ShuffleManager.this, inputManager,
- inputContext.getApplicationId(), shuffleSecret, conf);
+ FetcherBuilder fetcherBuilder = new FetcherBuilder(ShuffleManager.this, inputManager,
+ inputContext.getApplicationId(), shuffleSecret, srcNameTrimmed);
fetcherBuilder.setConnectionParameters(connectionTimeout, readTimeout);
if (codec != null) {
fetcherBuilder.setCompressionParameters(codec);
@@ -517,15 +509,29 @@ public class ShuffleManager implements FetcherCallback {
/////////////////// End of Methods from FetcherCallbackHandler
public void shutdown() throws InterruptedException {
- isShutdown.set(true);
- if (this.schedulerExecutor != null && !this.schedulerExecutor.isShutdown()) {
- this.schedulerExecutor.shutdownNow(); // Interrupt all running fetchers
- }
- if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
- this.fetcherExecutor.shutdownNow(); // Interrupt all running fetchers
+ if (!isShutdown.getAndSet(true)) {
+ // Shut down any pending fetchers
+ LOG.info("Shutting down pending fetchers on source" + srcNameTrimmed + ": "
+ + runningFetchers.size());
+ lock.lock();
+ try {
+ wakeLoop.signal(); // signal the fetch-scheduler
+ for (Fetcher fetcher : runningFetchers) {
+ fetcher.shutdown(); // This could be parallelized.
+ }
+ } finally {
+ lock.unlock();
+ }
+
+ if (this.schedulerExecutor != null && !this.schedulerExecutor.isShutdown()) {
+ this.schedulerExecutor.shutdownNow();
+ }
+ if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
+ this.fetcherExecutor.shutdownNow(); // Interrupts all running fetchers.
+ }
}
}
-
+
private void registerCompletedInput(FetchedInput fetchedInput) {
lock.lock();
try {
@@ -653,10 +659,16 @@ public class ShuffleManager implements FetcherCallback {
private class FetchFutureCallback implements FutureCallback<FetchResult> {
+ private final Fetcher fetcher;
+
+ public FetchFutureCallback(Fetcher fetcher) {
+ this.fetcher = fetcher;
+ }
+
private void doBookKeepingForFetcherComplete() {
- numRunningFetchers.decrementAndGet();
lock.lock();
try {
+ runningFetchers.remove(fetcher);
wakeLoop.signal();
} finally {
lock.unlock();
@@ -665,20 +677,27 @@ public class ShuffleManager implements FetcherCallback {
@Override
public void onSuccess(FetchResult result) {
- Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
- if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
- InputHost inputHost = knownSrcHosts.get(InputHost.createIdentifier(result.getHost(), result.getPort()));
- assert inputHost != null;
- for (InputAttemptIdentifier input : pendingInputs) {
- inputHost.addKnownInput(input);
+ fetcher.shutdown();
+ if (isShutdown.get()) {
+ LOG.info("Already shutdown. Ignoring event from fetcher");
+ } else {
+ Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
+ if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
+ InputHost inputHost = knownSrcHosts.get(InputHost.createIdentifier(result.getHost(), result.getPort()));
+ assert inputHost != null;
+ for (InputAttemptIdentifier input : pendingInputs) {
+ inputHost.addKnownInput(input);
+ }
+ pendingHosts.add(inputHost);
}
- pendingHosts.add(inputHost);
+ doBookKeepingForFetcherComplete();
}
- doBookKeepingForFetcherComplete();
}
@Override
public void onFailure(Throwable t) {
+ // Unsuccessful - the fetcher may not have shutdown correctly. Try shutting it down.
+ fetcher.shutdown();
if (isShutdown.get()) {
LOG.info("Already shutdown. Ignoring error from fetcher: " + t);
} else {