You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/11/27 00:04:09 UTC
git commit: TEZ-647. Add support configurable max app attempts for
Tez applications (bikas)
Updated Branches:
refs/heads/master f2680cb34 -> fc3a9480e
TEZ-647. Add support configurable max app attempts for Tez applications (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/fc3a9480
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/fc3a9480
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/fc3a9480
Branch: refs/heads/master
Commit: fc3a9480e587929847ceabf8afe2c0a91349d101
Parents: f2680cb
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Nov 26 15:00:58 2013 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Nov 26 15:00:58 2013 -0800
----------------------------------------------------------------------
.../org/apache/tez/client/TezClientUtils.java | 4 ++++
.../java/org/apache/tez/client/TezSession.java | 1 -
.../apache/tez/dag/api/TezConfiguration.java | 4 ++++
.../dag/app/dag/impl/ShuffleVertexManager.java | 20 +++++++++++++-------
.../apache/tez/mapreduce/client/YARNRunner.java | 2 ++
.../tez/mapreduce/hadoop/MRJobConfig.java | 3 +++
6 files changed, 26 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/fc3a9480/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 0d90977..d72ecc4 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -451,6 +451,10 @@ public class TezClientUtils {
TezConfiguration.TEZ_AM_CANCEL_DELEGATION_TOKEN,
TezConfiguration.TEZ_AM_CANCEL_DELEGATION_TOKEN_DEFAULT));
appContext.setAMContainerSpec(amContainer);
+
+ appContext.setMaxAppAttempts(
+ finalTezConf.getInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS,
+ TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS_DEFAULT));
return appContext;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/fc3a9480/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
index d9eba37..b02d345 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -36,7 +36,6 @@ import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/fc3a9480/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index c41a9ea..fb3c9a9 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -68,6 +68,10 @@ public class TezConfiguration extends Configuration {
+ "maxtaskfailures.per.node";
public static final int TEZ_AM_MAX_TASK_FAILURES_PER_NODE_DEFAULT = 3;
+ public static final String TEZ_AM_MAX_APP_ATTEMPTS = TEZ_AM_PREFIX +
+ "max.app.attempts";
+ public static int TEZ_AM_MAX_APP_ATTEMPTS_DEFAULT = 2;
+
public static final String TEZ_AM_MAX_TASK_ATTEMPTS =
TEZ_AM_PREFIX + "max.task.attempts";
public static final int TEZ_AM_MAX_TASK_ATTEMPTS_DEFAULT = 4;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/fc3a9480/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
index eda2dd5..26ee64b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
@@ -286,15 +286,15 @@ public class ShuffleVertexManager implements VertexScheduler {
int finalTaskParallelism = (remainderRangeForLastShuffler > 0) ?
(numShufflersWithBaseRange + 1) : (numShufflersWithBaseRange);
+ LOG.info("Reduce auto parallelism for vertex: " + managedVertex.getVertexId()
+ + " to " + finalTaskParallelism + " from " + pendingTasks.size()
+ + " . Expected output: " + expectedTotalSourceTasksOutputSize
+ + " based on actual output: " + completedSourceTasksOutputSize
+ + " from " + numVertexManagerEventsReceived + " vertex manager events. "
+ + " desiredTaskInputSize: " + desiredTaskInputDataSize);
+
if(finalTaskParallelism < currentParallelism) {
// final parallelism is less than actual parallelism
- LOG.info("Reducing parallelism for vertex: " + managedVertex.getVertexId()
- + " to " + finalTaskParallelism + " from " + pendingTasks.size()
- + " . Expected output: " + expectedTotalSourceTasksOutputSize
- + " based on actual output: " + completedSourceTasksOutputSize
- + " from " + numVertexManagerEventsReceived + " vertex manager events. "
- + " desiredTaskInputSize: " + desiredTaskInputDataSize);
-
Map<Vertex, EdgeManager> edgeManagers = new HashMap<Vertex, EdgeManager>(
bipartiteSources.size());
for(Vertex vertex : bipartiteSources.values()) {
@@ -425,6 +425,12 @@ public class ShuffleVertexManager implements VertexScheduler {
minTaskParallelism = conf.getInt(
TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM,
TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM_DEFAULT);
+ LOG.info("Shuffle Vertex Manager: settings" +
+ " minFrac:" + slowStartMinSrcCompletionFraction +
+ " maxFrac:" + slowStartMaxSrcCompletionFraction +
+ " auto:" + enableAutoParallelism +
+ " desiredTaskIput:" + desiredTaskInputDataSize +
+ " minTasks:" + minTaskParallelism);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/fc3a9480/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
index 371c551..cc4d1ed 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -594,6 +594,8 @@ public class YARNRunner implements ClientProtocol {
dagAMConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, amMemMB);
dagAMConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES, amCores);
+ dagAMConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS,
+ jobConf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
AMConfiguration amConfig = new AMConfiguration(
environment,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/fc3a9480/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
index 0974080..285f94e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
@@ -628,6 +628,9 @@ public interface MRJobConfig {
"$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*",
};
+ public static final String MR_AM_MAX_ATTEMPTS = "mapreduce.am.max-attempts";
+
+ public static final int DEFAULT_MR_AM_MAX_ATTEMPTS = 2;
// TODO Fix this. Not accessible in JobClient
/* do we need a HS delegation token for this client */