You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/11/22 01:51:56 UTC
git commit: TEZ-598. ShuffleHandler should handle invalid headers
correctly. (sseth)
Updated Branches:
refs/heads/master 3d646ab2a -> 6ccebe747
TEZ-598. ShuffleHandler should handle invalid headers correctly. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/6ccebe74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/6ccebe74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/6ccebe74
Branch: refs/heads/master
Commit: 6ccebe747217adcef05a7137eb8844c11e8ae18c
Parents: 3d646ab
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Nov 21 16:51:39 2013 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Nov 21 16:51:39 2013 -0800
----------------------------------------------------------------------
.../dag/app/TaskAttemptListenerImpTezDag.java | 10 ++++----
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 4 ++++
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 3 ++-
.../apache/tez/dag/app/rm/TaskScheduler.java | 11 +++++++++
.../library/common/shuffle/impl/Fetcher.java | 25 ++++++++++++++++----
.../common/shuffle/impl/ShuffleScheduler.java | 8 +++----
.../shuffle/common/DiskFetchedInput.java | 2 ++
.../runtime/library/shuffle/common/Fetcher.java | 3 ---
8 files changed, 46 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ccebe74/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index f4f32fb..7741b71 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -187,15 +187,13 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
LOG.debug("Container with id: " + containerId + " asked for a task");
}
if (!registeredContainers.containsKey(containerId)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Container " + containerId + " is no longer registered");
- }
- if(context.getAllContainers().get(containerId) == null)
+ if(context.getAllContainers().get(containerId) == null) {
LOG.info("Container with id: " + containerId
+ " is invalid and will be killed");
- else
+ } else {
LOG.info("Container with id: " + containerId
- + " is valid and will be killed");
+ + " is valid, but no longer registered, and will be killed");
+ }
task = TASK_FOR_INVALID_JVM;
} else {
pingContainerHeartbeatHandler(containerId);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ccebe74/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 8a587c1..76c36c5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -307,6 +307,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
readLock = readWriteLock.readLock();
writeLock = readWriteLock.writeLock();
this.attempts = Collections.emptyMap();
+ // TODO Avoid reading this from configuration for each task.
maxAttempts = this.conf.getInt(TezConfiguration.TEZ_AM_MAX_TASK_ATTEMPTS,
TezConfiguration.TEZ_AM_MAX_TASK_ATTEMPTS_DEFAULT);
taskId = new TezTaskID(vertexId, taskIndex);
@@ -1048,6 +1049,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// we don't need a new event if we already have a spare
if (--task.numberUncompletedAttempts == 0
&& task.successfulAttempt == null) {
+ LOG.info("Scheduling new attempt for task: " + task.getTaskId()
+ + ", currentFailedAttempts: " + task.failedAttempts + ", maxAttempts: "
+ + task.maxAttempts);
task.addAndScheduleAttempt();
}
} else {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ccebe74/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index cce2043..3eadded 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1727,7 +1727,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
public VertexState transition(VertexImpl vertex, VertexEvent event) {
boolean forceTransitionToKillWait = false;
vertex.completedTaskCount++;
- LOG.info("Num completed Tasks: " + vertex.completedTaskCount);
+ LOG.info("Num completed Tasks for " + vertex.getVertexId() + " [" + vertex.getName() + "] : "
+ + vertex.completedTaskCount);
VertexEventTaskCompleted taskEvent = (VertexEventTaskCompleted) event;
Task task = vertex.tasks.get(taskEvent.getTaskID());
if (taskEvent.getState() == TaskState.SUCCEEDED) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ccebe74/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index 6eedb15..d8d2de3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -446,6 +446,14 @@ public class TaskScheduler extends AbstractService
}
Map<CookieContainerRequest, Container> assignedContainers;
+ if (LOG.isDebugEnabled()) {
+ StringBuilder sb = new StringBuilder();
+ for (Container container: containers) {
+ sb.append(container.getId()).append(", ");
+ }
+ LOG.debug("Assigned New Containers: " + sb.toString());
+ }
+
synchronized (this) {
if (!shouldReuseContainers) {
List<Container> modifiableContainerList = Lists.newLinkedList(containers);
@@ -1423,6 +1431,9 @@ public class TaskScheduler extends AbstractService
if (heldContainer == null) {
continue;
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Considering HeldContainer: " + heldContainer + " for assignment");
+ }
long currentTs = System.currentTimeMillis();
long nextScheduleTs = heldContainer.getNextScheduleTime();
if (currentTs >= nextScheduleTs) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ccebe74/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index fd88973..e8038a4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -27,7 +27,7 @@ import java.net.URL;
import java.net.URLConnection;
import java.security.GeneralSecurityException;
import java.util.Arrays;
-import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
@@ -93,6 +93,8 @@ class Fetcher extends Thread {
private static boolean sslShuffle;
private static SSLFactory sslFactory;
+
+ private LinkedHashSet<InputAttemptIdentifier> remaining;
public Fetcher(Configuration job,
ShuffleScheduler scheduler, MergeManager merger,
@@ -157,6 +159,7 @@ class Fetcher extends Thread {
public void run() {
try {
while (!stopped && !Thread.currentThread().isInterrupted()) {
+ remaining = null; // Safety.
MapHost host = null;
try {
// If merge is on, block
@@ -233,7 +236,7 @@ class Fetcher extends Thread {
}
// List of maps to be fetched yet
- Set<InputAttemptIdentifier> remaining = new HashSet<InputAttemptIdentifier>(srcAttempts);
+ remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);
// Construct the url and connect
DataInputStream input;
@@ -319,7 +322,7 @@ class Fetcher extends Thread {
// yet_to_be_fetched list and marking the failed tasks.
InputAttemptIdentifier[] failedTasks = null;
while (!remaining.isEmpty() && failedTasks == null) {
- failedTasks = copyMapOutput(host, input, remaining);
+ failedTasks = copyMapOutput(host, input);
}
if(failedTasks != null && failedTasks.length > 0) {
@@ -346,8 +349,7 @@ class Fetcher extends Thread {
private static InputAttemptIdentifier[] EMPTY_ATTEMPT_ID_ARRAY = new InputAttemptIdentifier[0];
private InputAttemptIdentifier[] copyMapOutput(MapHost host,
- DataInputStream input,
- Set<InputAttemptIdentifier> remaining) {
+ DataInputStream input) {
MapOutput mapOutput = null;
InputAttemptIdentifier srcAttemptId = null;
long decompressedLength = -1;
@@ -376,6 +378,11 @@ class Fetcher extends Thread {
// Do some basic sanity verification
if (!verifySanity(compressedLength, decompressedLength, forReduce,
remaining, srcAttemptId)) {
+ if (srcAttemptId == null) {
+ LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null");
+ srcAttemptId = getNextRemainingAttempt();
+ }
+ assert(srcAttemptId != null);
return new InputAttemptIdentifier[] {srcAttemptId};
}
@@ -474,6 +481,14 @@ class Fetcher extends Thread {
return true;
}
+
+ private InputAttemptIdentifier getNextRemainingAttempt() {
+ if (remaining.size() > 0) {
+ return remaining.iterator().next();
+ } else {
+ return null;
+ }
+ }
/**
* Create the map-output-url. This will contain all the map ids
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ccebe74/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index 33da660..db4c794 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -189,15 +189,13 @@ class ShuffleScheduler {
hostFailures.put(hostname, new IntWritable(1));
}
if (failures >= abortFailureLimit) {
- try {
- throw new IOException(failures
+ IOException ioe = new IOException(failures
+ " failures downloading "
+ TezRuntimeUtils.getTaskAttemptIdentifier(
inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getSrcTaskIndex(),
srcAttempt.getAttemptNumber()));
- } catch (IOException ie) {
- shuffle.reportException(ie);
- }
+ ioe.fillInStackTrace();
+ shuffle.reportException(ioe);
}
checkAndInformJobTracker(failures, srcAttempt, readError);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ccebe74/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
index 9aeb65d..06b9314 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
@@ -51,6 +51,8 @@ public class DiskFetchedInput extends FetchedInput {
this.localFS = FileSystem.getLocal(conf);
this.outputPath = filenameAllocator.getInputFileForWrite(
this.inputAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(), actualSize);
+ // Files are not clobbered due to the id being appended to the outputPath in the tmpPath,
+ // otherwise fetches for the same task but from different attempts would clobber each other.
this.tmpOutputPath = outputPath.suffix(String.valueOf(id));
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6ccebe74/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
index 70059b0..1231dc3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
@@ -151,7 +151,6 @@ public class Fetcher implements Callable<FetchResult> {
for (Iterator<InputAttemptIdentifier> leftIter = remaining.iterator(); leftIter
.hasNext();) {
fetcherCallback.fetchFailed(host, leftIter.next(), true);
- leftIter.remove();
}
return new FetchResult(host, port, partition, remaining);
}
@@ -170,7 +169,6 @@ public class Fetcher implements Callable<FetchResult> {
LOG.warn("Fetch Failure from host while connecting: " + host
+ ", attempt: " + firstAttempt + " Informing ShuffleManager: ", e);
fetcherCallback.fetchFailed(host, firstAttempt, false);
- remaining.remove(firstAttempt);
return new FetchResult(host, port, partition, remaining);
}
@@ -190,7 +188,6 @@ public class Fetcher implements Callable<FetchResult> {
LOG.warn("copyInputs failed for tasks " + Arrays.toString(failedInputs));
for (InputAttemptIdentifier left : failedInputs) {
fetcherCallback.fetchFailed(host, left, false);
- remaining.remove(left);
}
}