You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/10/08 08:44:13 UTC
git commit: TEZ-536. Fix a couple of bugs in the
BroadcastShuffleManager which was causing it to not fetch inputs at times.
(sseth)
Updated Branches:
refs/heads/master 4dd5e195d -> a181fba3f
TEZ-536. Fix a couple of bugs in the BroadcastShuffleManager which was
causing it to not fetch inputs at times. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/a181fba3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/a181fba3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/a181fba3
Branch: refs/heads/master
Commit: a181fba3f72117d4658751fce79ed9db920c6343
Parents: 4dd5e19
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Oct 7 23:43:29 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Oct 7 23:43:29 2013 -0700
----------------------------------------------------------------------
.../apache/tez/runtime/api/impl/TaskSpec.java | 2 +-
.../BroadcastShuffleInputEventHandler.java | 3 +-
.../input/BroadcastShuffleManager.java | 85 ++++++++++++--------
.../broadcast/output/FileBasedKVWriter.java | 2 +
4 files changed, 55 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a181fba3/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
index 6e0995a..534de99 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
@@ -128,7 +128,7 @@ public class TaskSpec implements Writable {
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("TaskAttemptID:" + taskAttemptId);
- sb.append("processorName=" + processorDescriptor.getClassName()
+ sb.append(", processorName=" + processorDescriptor.getClassName()
+ ", inputSpecListSize=" + inputSpecList.size()
+ ", outputSpecListSize=" + outputSpecList.size());
sb.append(", inputSpecList=[");
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a181fba3/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
index 38f6e6c..cd50ec6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
@@ -29,7 +29,6 @@ import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleInputEventHandler;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
import com.google.common.base.Preconditions;
@@ -38,7 +37,7 @@ import com.google.protobuf.TextFormat;
public class BroadcastShuffleInputEventHandler {
- private static final Log LOG = LogFactory.getLog(ShuffleInputEventHandler.class);
+ private static final Log LOG = LogFactory.getLog(BroadcastShuffleInputEventHandler.class);
private final BroadcastShuffleManager shuffleManager;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a181fba3/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index fd47757..82c7f37 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
@@ -90,9 +91,8 @@ public class BroadcastShuffleManager implements FetcherCallback {
private final BlockingQueue<FetchedInput> completedInputs;
private final Set<InputIdentifier> completedInputSet;
-// private final Set<InputIdentifier> pendingInputs;
private final ConcurrentMap<String, InputHost> knownSrcHosts;
- private final Set<InputHost> pendingHosts;
+ private final BlockingQueue<InputHost> pendingHosts;
private final Set<InputAttemptIdentifier> obsoletedInputs;
private final AtomicInteger numCompletedInputs = new AtomicInteger(0);
@@ -133,7 +133,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
- pendingHosts = Collections.newSetFromMap(new ConcurrentHashMap<InputHost, Boolean>());
+ pendingHosts = new LinkedBlockingQueue<InputHost>();
obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputAttemptIdentifier, Boolean>());
int maxConfiguredFetchers =
@@ -196,41 +196,50 @@ public class BroadcastShuffleManager implements FetcherCallback {
} finally {
lock.unlock();
}
- if (shuffleError != null) {
- // InputContext has already been informed of a fatal error.
- // Initiate shutdown.
- break;
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("NumCompletedInputs: " + numCompletedInputs);
- }
- if (numCompletedInputs.get() < numInputs) {
- lock.lock();
- try {
- int numFetchersToRun = Math.min(pendingHosts.size(), numFetchers - numRunningFetchers.get());
- int count = 0;
- for (Iterator<InputHost> inputHostIter = pendingHosts.iterator() ; inputHostIter.hasNext() ; ) {
- InputHost inputHost = inputHostIter.next();
- inputHostIter.remove();
- if (inputHost.getNumPendingInputs() > 0) {
- Fetcher fetcher = constructFetcherForHost(inputHost);
- LOG.info("Scheduling fetch for inputHost: " + inputHost);
- numRunningFetchers.incrementAndGet();
- ListenableFuture<FetchResult> future = fetcherExecutor
- .submit(fetcher);
- Futures.addCallback(future, fetchFutureCallback);
- if (++count >= numFetchersToRun) {
- break;
- }
+ }
+ if (shuffleError != null) {
+ // InputContext has already been informed of a fatal error.
+ // Initiate shutdown.
+ break;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("NumCompletedInputs: " + numCompletedInputs);
+ }
+ if (numCompletedInputs.get() < numInputs) {
+ lock.lock();
+ try {
+ int numFetchersToRun = Math.min(pendingHosts.size(), numFetchers
+ - numRunningFetchers.get());
+ int count = 0;
+ while (pendingHosts.peek() != null) {
+ InputHost inputHost = pendingHosts.take();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing pending host: " + inputHost);
+ }
+ if (inputHost.getNumPendingInputs() > 0) {
+ LOG.info("Scheduling fetch for inputHost: " + inputHost);
+ Fetcher fetcher = constructFetcherForHost(inputHost);
+ numRunningFetchers.incrementAndGet();
+ ListenableFuture<FetchResult> future = fetcherExecutor
+ .submit(fetcher);
+ Futures.addCallback(future, fetchFutureCallback);
+ if (++count >= numFetchersToRun) {
+ break;
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping host: " + inputHost.getHost()
+ + " since it has know inputs to process");
}
}
- } finally {
- lock.unlock();
}
+ } finally {
+ lock.unlock();
}
}
}
+ LOG.info("Shutting down FetchScheduler");
// TODO NEWTEZ Maybe clean up inputs.
if (!fetcherExecutor.isShutdown()) {
fetcherExecutor.shutdownNow();
@@ -282,10 +291,18 @@ public class BroadcastShuffleManager implements FetcherCallback {
host = old;
}
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding input: " + srcAttemptIdentifier + ", to host: " + host);
+ }
host.addKnownInput(srcAttemptIdentifier);
lock.lock();
try {
- pendingHosts.add(host);
+ boolean added = pendingHosts.offer(host);
+ if (!added) {
+ String errorMessage = "Unable to add host: " + host.getHost() + " to pending queue";
+ LOG.error(errorMessage);
+ throw new TezUncheckedException(errorMessage);
+ }
wakeLoop.signal();
} finally {
lock.unlock();
@@ -335,7 +352,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
long copyDuration) throws IOException {
InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
- LOG.info("Complete fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType());
+ LOG.info("Completed fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType());
// Count irrespective of whether this is a copy of an already fetched input
lock.lock();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a181fba3/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
index 9941de0..e6dc581 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
@@ -104,6 +104,7 @@ public class FileBasedKVWriter implements KeyValueWriter {
Path indexFile = ouputFileManager
.getOutputIndexFileForWrite(INDEX_RECORD_LENGTH);
+ LOG.info("Writing index file: " + indexFile);
sr.writeToFile(indexFile, conf);
return numRecords > 0;
}
@@ -116,6 +117,7 @@ public class FileBasedKVWriter implements KeyValueWriter {
public void initWriter() throws IOException {
Path outputFile = ouputFileManager.getOutputFileForWrite();
+ LOG.info("Writing data file: " + outputFile);
// TODO NEWTEZ Consider making the buffer size configurable. Also consider
// setting up an in-memory buffer which is occasionally flushed to disk so