You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/11/27 00:04:09 UTC

git commit: TEZ-647. Add support configurable max app attempts for Tez applications (bikas)

Updated Branches:
  refs/heads/master f2680cb34 -> fc3a9480e


TEZ-647. Add support configurable max app attempts for Tez applications (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/fc3a9480
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/fc3a9480
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/fc3a9480

Branch: refs/heads/master
Commit: fc3a9480e587929847ceabf8afe2c0a91349d101
Parents: f2680cb
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Nov 26 15:00:58 2013 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Nov 26 15:00:58 2013 -0800

----------------------------------------------------------------------
 .../org/apache/tez/client/TezClientUtils.java   |  4 ++++
 .../java/org/apache/tez/client/TezSession.java  |  1 -
 .../apache/tez/dag/api/TezConfiguration.java    |  4 ++++
 .../dag/app/dag/impl/ShuffleVertexManager.java  | 20 +++++++++++++-------
 .../apache/tez/mapreduce/client/YARNRunner.java |  2 ++
 .../tez/mapreduce/hadoop/MRJobConfig.java       |  3 +++
 6 files changed, 26 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/fc3a9480/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 0d90977..d72ecc4 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -451,6 +451,10 @@ public class TezClientUtils {
         TezConfiguration.TEZ_AM_CANCEL_DELEGATION_TOKEN,
         TezConfiguration.TEZ_AM_CANCEL_DELEGATION_TOKEN_DEFAULT));
     appContext.setAMContainerSpec(amContainer);
+    
+    appContext.setMaxAppAttempts(
+        finalTezConf.getInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 
+            TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS_DEFAULT));
 
     return appContext;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/fc3a9480/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
index d9eba37..b02d345 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -36,7 +36,6 @@ import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/fc3a9480/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index c41a9ea..fb3c9a9 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -68,6 +68,10 @@ public class TezConfiguration extends Configuration {
       + "maxtaskfailures.per.node";
   public static final int TEZ_AM_MAX_TASK_FAILURES_PER_NODE_DEFAULT = 3;
 
+  public static final String TEZ_AM_MAX_APP_ATTEMPTS = TEZ_AM_PREFIX + 
+      "max.app.attempts";
+  public static int TEZ_AM_MAX_APP_ATTEMPTS_DEFAULT = 2;
+  
   public static final String TEZ_AM_MAX_TASK_ATTEMPTS =
       TEZ_AM_PREFIX + "max.task.attempts";
   public static final int TEZ_AM_MAX_TASK_ATTEMPTS_DEFAULT = 4;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/fc3a9480/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
index eda2dd5..26ee64b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
@@ -286,15 +286,15 @@ public class ShuffleVertexManager implements VertexScheduler {
     int finalTaskParallelism = (remainderRangeForLastShuffler > 0) ?
           (numShufflersWithBaseRange + 1) : (numShufflersWithBaseRange);
 
+    LOG.info("Reduce auto parallelism for vertex: " + managedVertex.getVertexId() 
+        + " to " + finalTaskParallelism + " from " + pendingTasks.size() 
+        + " . Expected output: " + expectedTotalSourceTasksOutputSize 
+        + " based on actual output: " + completedSourceTasksOutputSize
+        + " from " + numVertexManagerEventsReceived + " vertex manager events. "
+        + " desiredTaskInputSize: " + desiredTaskInputDataSize);
+          
     if(finalTaskParallelism < currentParallelism) {
       // final parallelism is less than actual parallelism
-      LOG.info("Reducing parallelism for vertex: " + managedVertex.getVertexId() 
-          + " to " + finalTaskParallelism + " from " + pendingTasks.size() 
-          + " . Expected output: " + expectedTotalSourceTasksOutputSize 
-          + " based on actual output: " + completedSourceTasksOutputSize
-          + " from " + numVertexManagerEventsReceived + " vertex manager events. "
-          + " desiredTaskInputSize: " + desiredTaskInputDataSize);
-      
       Map<Vertex, EdgeManager> edgeManagers = new HashMap<Vertex, EdgeManager>(
           bipartiteSources.size());
       for(Vertex vertex : bipartiteSources.values()) {
@@ -425,6 +425,12 @@ public class ShuffleVertexManager implements VertexScheduler {
     minTaskParallelism = conf.getInt(
             TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM,
             TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM_DEFAULT);
+    LOG.info("Shuffle Vertex Manager: settings" + 
+            " minFrac:" + slowStartMinSrcCompletionFraction +
+            " maxFrac:" + slowStartMaxSrcCompletionFraction +
+            " auto:" + enableAutoParallelism + 
+            " desiredTaskIput:" + desiredTaskInputDataSize +
+            " minTasks:" + minTaskParallelism);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/fc3a9480/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
index 371c551..cc4d1ed 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -594,6 +594,8 @@ public class YARNRunner implements ClientProtocol {
       dagAMConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, amMemMB);
       dagAMConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES, amCores);
 
+      dagAMConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 
+          jobConf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
       
       AMConfiguration amConfig = new AMConfiguration(
           environment,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/fc3a9480/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
index 0974080..285f94e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
@@ -628,6 +628,9 @@ public interface MRJobConfig {
       "$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*",
   };
 
+  public static final String MR_AM_MAX_ATTEMPTS = "mapreduce.am.max-attempts";
+
+  public static final int DEFAULT_MR_AM_MAX_ATTEMPTS = 2;
 
   // TODO Fix this. Not accessible in JobClient
   /* do we need a HS delegation token for this client */