You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/10/10 20:08:30 UTC
git commit: TEZ-548. Fix a bug in BroadcastShuffleManager which was
causing it to hang if an error occurred in the Scheduler thread. Also fixed
the error in the thread. (sseth)
Updated Branches:
refs/heads/master ad5666a9d -> 2f207c0d8
TEZ-548. Fix a bug in BroadcastShuffleManager which was causing it to
hang if an error occurred in the Scheduler thread. Also fixed the error
in the thread. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/2f207c0d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/2f207c0d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/2f207c0d
Branch: refs/heads/master
Commit: 2f207c0d8cf4117dd30afa315374d5286a8abe46
Parents: ad5666a
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Oct 10 11:07:42 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Oct 10 11:07:42 2013 -0700
----------------------------------------------------------------------
.../BroadcastShuffleInputEventHandler.java | 2 +-
.../input/BroadcastShuffleManager.java | 60 +++++++++++++++-----
.../shuffle/impl/ShuffleInputEventHandler.java | 3 +-
.../library/shuffle/common/InputHost.java | 24 ++++----
4 files changed, 61 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2f207c0d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
index cd50ec6..5011140 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
@@ -72,7 +72,7 @@ public class BroadcastShuffleInputEventHandler {
} catch (InvalidProtocolBufferException e) {
throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
}
- LOG.info("Processing data moveement event with srcIndex: "
+ LOG.info("Processing DataMovementEvent with srcIndex: "
+ dme.getSourceIndex() + ", targetIndex: " + dme.getTargetIndex()
+ ", attemptNum: " + dme.getVersion() + ", payload: "
+ TextFormat.shortDebugString(shufflePayload));
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2f207c0d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index 1f0f590..05b9761 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -89,6 +88,9 @@ public class BroadcastShuffleManager implements FetcherCallback {
private final ExecutorService fetcherRawExecutor;
private final ListeningExecutorService fetcherExecutor;
+ private final ExecutorService schedulerRawExecutor;
+ private final ListeningExecutorService schedulerExecutor;
+
private final BlockingQueue<FetchedInput> completedInputs;
private final Set<InputIdentifier> completedInputSet;
private final ConcurrentMap<String, InputHost> knownSrcHosts;
@@ -99,9 +101,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
private final long startTime;
private long lastProgressTime;
-
- private FutureTask<Void> runShuffleFuture;
-
+
// Required to be held when manipulating pendingHosts
private ReentrantLock lock = new ReentrantLock();
private Condition wakeLoop = lock.newCondition();
@@ -143,11 +143,24 @@ public class BroadcastShuffleManager implements FetcherCallback {
this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
- this.fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers,
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Fetcher #%d")
+ this.fetcherRawExecutor = Executors.newFixedThreadPool(
+ numFetchers,
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat(
+ "Fetcher [" + inputContext.getUniqueIdentifier() + "] #%d")
.build());
this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
+ this.schedulerRawExecutor = Executors.newFixedThreadPool(
+ 1,
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat(
+ "ShuffleRunner [" + inputContext.getUniqueIdentifier() + "]")
+ .build());
+ this.schedulerExecutor = MoreExecutors.listeningDecorator(schedulerRawExecutor);
+
this.startTime = System.currentTimeMillis();
this.lastProgressTime = startTime;
@@ -178,10 +191,11 @@ public class BroadcastShuffleManager implements FetcherCallback {
public void run() {
RunBroadcastShuffleCallable callable = new RunBroadcastShuffleCallable();
- runShuffleFuture = new FutureTask<Void>(callable);
- Thread runThread = new Thread(runShuffleFuture, "ShuffleRunner");
- runThread.setDaemon(true);
- runThread.start();
+ ListenableFuture<Void> runShuffleFuture = schedulerExecutor
+ .submit(callable);
+ Futures.addCallback(runShuffleFuture, new SchedulerFutureCallback());
+ // Shutdown this executor once this task, and the callback complete.
+ schedulerExecutor.shutdown();
}
private class RunBroadcastShuffleCallable implements Callable<Void> {
@@ -209,22 +223,21 @@ public class BroadcastShuffleManager implements FetcherCallback {
if (numCompletedInputs.get() < numInputs) {
lock.lock();
try {
- int numFetchersToRun = Math.min(pendingHosts.size(), numFetchers
- - numRunningFetchers.get());
+ int maxFetchersToRun = numFetchers - numRunningFetchers.get();
int count = 0;
while (pendingHosts.peek() != null) {
InputHost inputHost = pendingHosts.take();
if (LOG.isDebugEnabled()) {
- LOG.debug("Processing pending host: " + inputHost);
+ LOG.debug("Processing pending host: " + inputHost.toDetailedString());
}
if (inputHost.getNumPendingInputs() > 0) {
- LOG.info("Scheduling fetch for inputHost: " + inputHost);
+ LOG.info("Scheduling fetch for inputHost: " + inputHost.getHost());
Fetcher fetcher = constructFetcherForHost(inputHost);
numRunningFetchers.incrementAndGet();
ListenableFuture<FetchResult> future = fetcherExecutor
.submit(fetcher);
Futures.addCallback(future, fetchFutureCallback);
- if (++count >= numFetchersToRun) {
+ if (++count >= maxFetchersToRun) {
break;
}
} else {
@@ -276,6 +289,8 @@ public class BroadcastShuffleManager implements FetcherCallback {
// fetcher, especially in the case where #hosts < #fetchers
fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), 0,
pendingInputsForHost);
+ LOG.info("Created Fetcher for host: " + inputHost.getHost()
+ + ", with inputs: " + pendingInputsForHost);
return fetcherBuilder.build();
}
@@ -502,6 +517,21 @@ public class BroadcastShuffleManager implements FetcherCallback {
}
+ private class SchedulerFutureCallback implements FutureCallback<Void> {
+
+ @Override
+ public void onSuccess(Void result) {
+ LOG.info("Scheduler thread completed");
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Scheduler failed with error: ", t);
+ inputContext.fatalError(t, "Broadcast Scheduler Failed");
+ }
+
+ }
+
private class FetchFutureCallback implements FutureCallback<FetchResult> {
private void doBookKeepingForFetcherComplete() {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2f207c0d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index d731a46..8fae1c1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -74,9 +74,8 @@ public class ShuffleInputEventHandler {
}
int partitionId = dmEvent.getSourceIndex();
URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
- LOG.info("Data movement event baseUri:" + baseUri);
-
InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent());
+ LOG.info("DataMovementEvent baseUri:" + baseUri + ", src: " + srcAttemptIdentifier);
scheduler.addKnownMapOutput(shufflePayload.getHost(), partitionId, baseUri.toString(), srcAttemptIdentifier);
// TODO NEWTEZ See if this duration hack can be removed.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2f207c0d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
index 4759a8b..7905e27 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
@@ -19,8 +19,9 @@
package org.apache.tez.runtime.library.shuffle.common;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
@@ -30,7 +31,7 @@ public class InputHost {
private final String host;
private final int port;
- private final List<InputAttemptIdentifier> inputs = new LinkedList<InputAttemptIdentifier>();
+ private final BlockingQueue<InputAttemptIdentifier> inputs = new LinkedBlockingQueue<InputAttemptIdentifier>();
public InputHost(String hostName, int port, ApplicationId appId) {
this.host = hostName;
@@ -45,18 +46,18 @@ public class InputHost {
return this.port;
}
- public synchronized int getNumPendingInputs() {
+ public int getNumPendingInputs() {
return inputs.size();
}
- public synchronized void addKnownInput(InputAttemptIdentifier srcAttempt) {
+ public void addKnownInput(InputAttemptIdentifier srcAttempt) {
inputs.add(srcAttempt);
}
- public synchronized List<InputAttemptIdentifier> clearAndGetPendingInputs() {
+ public List<InputAttemptIdentifier> clearAndGetPendingInputs() {
List<InputAttemptIdentifier> inputsCopy = new ArrayList<InputAttemptIdentifier>(
- inputs);
- inputs.clear();
+ inputs.size());
+ inputs.drainTo(inputsCopy);
return inputsCopy;
}
@@ -88,10 +89,13 @@ public class InputHost {
return true;
}
- @Override
- public String toString() {
+ public String toDetailedString() {
return "InputHost [host=" + host + ", port=" + port + ", inputs=" + inputs
+ "]";
}
-
+
+ @Override
+ public String toString() {
+ return "InputHost [host=" + host + ", port=" + port + "]";
+ }
}