You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/02/11 03:13:02 UTC

tez git commit: TEZ-1233. Allow configuration of framework parameters per vertex (zjffdu)

Repository: tez
Updated Branches:
  refs/heads/master fe39ede33 -> eb9a0345c


TEZ-1233. Allow configuration of framework parameters per vertex (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/eb9a0345
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/eb9a0345
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/eb9a0345

Branch: refs/heads/master
Commit: eb9a0345c8617214be6e1cc331efb553af3e1ccc
Parents: fe39ede
Author: Jeff Zhang <zj...@apache.org>
Authored: Wed Feb 11 10:10:51 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Wed Feb 11 10:11:41 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/dag/api/ConfigurationScope.java  |  33 ++++
 .../main/java/org/apache/tez/dag/api/DAG.java   |  58 ++++---
 .../main/java/org/apache/tez/dag/api/Scope.java |  28 ++++
 .../apache/tez/dag/api/TezConfiguration.java    | 150 ++++++++++++++++++-
 .../java/org/apache/tez/dag/api/Vertex.java     |  11 ++
 tez-api/src/main/proto/DAGApiRecords.proto      |   3 +-
 .../java/org/apache/tez/dag/api/TestDAG.java    |  50 +++++++
 .../org/apache/tez/dag/api/TestDAGVerify.java   |   2 +-
 .../tez/dag/api/TestTezConfiguration.java       |  32 ++++
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  11 +-
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   2 +
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  29 ++--
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  34 +++--
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |  32 +++-
 15 files changed, 419 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9979c50..22ecb2a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1233. Allow configuration of framework parameters per vertex.
   TEZ-2045. TaskAttemptListener should not pull Tasks from AMContainer. Instead these should be registered with the listener.
   TEZ-1914. VertexManager logic should not run on the central dispatcher
   TEZ-2023. Refactor logIndividualFetchComplete() to be common for both shuffle-schedulers.

http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-api/src/main/java/org/apache/tez/dag/api/ConfigurationScope.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/ConfigurationScope.java b/tez-api/src/main/java/org/apache/tez/dag/api/ConfigurationScope.java
new file mode 100644
index 0000000..bf2a7d9
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/ConfigurationScope.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.api;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+@Private
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface ConfigurationScope {
+
+  Scope value();
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 7be5ba4..914f946 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -88,6 +88,7 @@ public class DAG {
   private DAGAccessControls dagAccessControls;
   Map<String, LocalResource> commonTaskLocalFiles = Maps.newHashMap();
   String dagInfo;
+  private Map<String,String> dagConf = new HashMap<String, String>();
 
   private Stack<String> topologicalVertexStack = new Stack<String>();
 
@@ -322,7 +323,13 @@ public class DAG {
   public String getName() {
     return this.name;
   }
-  
+
+  public DAG setConf(String property, String value) {
+    TezConfiguration.validateProperty(property, Scope.DAG);
+    dagConf.put(property, value);
+    return this;
+  }
+
   @Private
   public Map<String, LocalResource> getTaskLocalFiles() {
     return commonTaskLocalFiles;
@@ -657,22 +664,21 @@ public class DAG {
   }
 
   // create protobuf message describing DAG
-  public DAGPlan createDag(Configuration dagConf, Credentials extraCredentials,
+  public DAGPlan createDag(Configuration tezConf, Credentials extraCredentials,
                            Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
                            boolean tezLrsAsArchive) {
-    return createDag(dagConf, extraCredentials, tezJarResources, binaryConfig, tezLrsAsArchive,
+    return createDag(tezConf, extraCredentials, tezJarResources, binaryConfig, tezLrsAsArchive,
         null);
   }
 
   // create protobuf message describing DAG
   @Private
-  public synchronized DAGPlan createDag(Configuration dagConf, Credentials extraCredentials,
+  public synchronized DAGPlan createDag(Configuration tezConf, Credentials extraCredentials,
       Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
       boolean tezLrsAsArchive, Map<String, String> additionalConfigs) {
     verify(true);
 
     DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
-
     dagBuilder.setName(this.name);
     if (this.dagInfo != null && !this.dagInfo.isEmpty()) {
       dagBuilder.setDagInfo(this.dagInfo);
@@ -715,9 +721,9 @@ public class DAG {
       // infer credentials, resources and parallelism from data source
       Resource vertexTaskResource = vertex.getTaskResource();
       if (vertexTaskResource == null) {
-        vertexTaskResource = Resource.newInstance(dagConf.getInt(
+        vertexTaskResource = Resource.newInstance(tezConf.getInt(
             TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB,
-            TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT), dagConf.getInt(
+            TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT), tezConf.getInt(
             TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES,
             TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT));
       }
@@ -782,13 +788,24 @@ public class DAG {
         }
       }
 
+      if (vertex.getConf()!= null && vertex.getConf().size() > 0) {
+        ConfigurationProto.Builder confBuilder = ConfigurationProto.newBuilder();
+        for (Map.Entry<String, String> entry : vertex.getConf().entrySet()) {
+          PlanKeyValuePair.Builder keyValueBuilder = PlanKeyValuePair.newBuilder();
+          keyValueBuilder.setKey(entry.getKey());
+          keyValueBuilder.setValue(entry.getValue());
+          confBuilder.addConfKeyValues(keyValueBuilder);
+        }
+        vertexBuilder.setVertexConf(confBuilder);
+      }
+
       //task config
       PlanTaskConfiguration.Builder taskConfigBuilder = PlanTaskConfiguration.newBuilder();
       taskConfigBuilder.setNumTasks(vertexParallelism);
       taskConfigBuilder.setMemoryMb(vertexTaskResource.getMemory());
       taskConfigBuilder.setVirtualCores(vertexTaskResource.getVirtualCores());
       taskConfigBuilder.setJavaOpts(
-          TezClientUtils.addDefaultsToTaskLaunchCmdOpts(vertex.getTaskLaunchCmdOpts(), dagConf));
+          TezClientUtils.addDefaultsToTaskLaunchCmdOpts(vertex.getTaskLaunchCmdOpts(), tezConf));
 
       taskConfigBuilder.setTaskModule(vertex.getName());
       if (!vertexLRs.isEmpty()) {
@@ -796,7 +813,7 @@ public class DAG {
       }
 
       Map<String, String> taskEnv = Maps.newHashMap(vertex.getTaskEnvironment());
-      TezYARNUtils.setupDefaultEnv(taskEnv, dagConf,
+      TezYARNUtils.setupDefaultEnv(taskEnv, tezConf,
           TezConfiguration.TEZ_TASK_LAUNCH_ENV,
           TezConfiguration.TEZ_TASK_LAUNCH_ENV_DEFAULT, tezLrsAsArchive);
       for (Map.Entry<String, String> entry : taskEnv.entrySet()) {
@@ -866,16 +883,6 @@ public class DAG {
 
     ConfigurationProto.Builder confProtoBuilder =
         ConfigurationProto.newBuilder();
-    if (dagConf != null) {
-      Iterator<Entry<String, String>> iter = dagConf.iterator();
-      while (iter.hasNext()) {
-        Entry<String, String> entry = iter.next();
-        PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
-        kvp.setKey(entry.getKey());
-        kvp.setValue(entry.getValue());
-        confProtoBuilder.addConfKeyValues(kvp);
-      }
-    }
     if (dagAccessControls != null) {
       Configuration aclConf = new Configuration(false);
       dagAccessControls.serializeToConfiguration(aclConf);
@@ -885,6 +892,7 @@ public class DAG {
         PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
         kvp.setKey(entry.getKey());
         kvp.setValue(entry.getValue());
+        TezConfiguration.validateProperty(entry.getKey(), Scope.DAG);
         confProtoBuilder.addConfKeyValues(kvp);
       }
     }
@@ -893,11 +901,19 @@ public class DAG {
         PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
         kvp.setKey(entry.getKey());
         kvp.setValue(entry.getValue());
+        TezConfiguration.validateProperty(entry.getKey(), Scope.DAG);
+        confProtoBuilder.addConfKeyValues(kvp);
+      }
+    }
+    if (this.dagConf != null && !this.dagConf.isEmpty()) {
+      for (Entry<String, String> entry : this.dagConf.entrySet()) {
+        PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
+        kvp.setKey(entry.getKey());
+        kvp.setValue(entry.getValue());
         confProtoBuilder.addConfKeyValues(kvp);
       }
     }
-    dagBuilder.setDagKeyValues(confProtoBuilder); // This does not seem to be used anywhere
-    // should this replace BINARY_PB_CONF???
+    dagBuilder.setDagConf(confProtoBuilder);
 
     if (dagCredentials != null) {
       dagBuilder.setCredentialsBinary(DagTypeConverters.convertCredentialsToProto(dagCredentials));

http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-api/src/main/java/org/apache/tez/dag/api/Scope.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Scope.java b/tez-api/src/main/java/org/apache/tez/dag/api/Scope.java
new file mode 100644
index 0000000..d862e8f
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Scope.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+@Private
+public enum Scope {
+  // DO NOT CHANGE THE ORDER
+  AM,       // can only been set at AM level 
+  DAG,      // can been set at AM/DAG level
+  VERTEX,   // can been set at AM/DAG/VERTEX level
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index bff5c6a..cfd6426 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -18,6 +18,13 @@
 
 package org.apache.tez.dag.api;
 
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -25,6 +32,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 
+import com.google.common.annotations.VisibleForTesting;
+
+
 /**
  * Defines the configurations for Tez. These configurations are typically specified in 
  * tez-site.xml on the client machine where TezClient is used to launch the Tez application.
@@ -33,7 +43,11 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
 @Public
 public class TezConfiguration extends Configuration {
 
-  public final static String TEZ_SITE_XML = "tez-site.xml";
+  private final static String TEZ_SITE_XML = "tez-site.xml";
+
+  private final static Log LOG = LogFactory.getLog(TezConfiguration.class);
+
+  private static Map<String, Scope> PropertyScope = new HashMap<String, Scope>();
 
   static {
     Configuration.addDeprecation("tez.am.counters.max.keys", TezConfiguration.TEZ_COUNTERS_MAX);
@@ -60,6 +74,24 @@ public class TezConfiguration extends Configuration {
 
     Configuration.addDeprecation("tez.task.max-events-per-heartbeat.max",
         TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT);
+
+    for (Field field : TezConfiguration.class.getFields()) {
+      if (field.isAnnotationPresent(ConfigurationScope.class)) {
+        ConfigurationScope confScope = field.getAnnotation(ConfigurationScope.class);
+        if (field.getType() == String.class) {
+          try {
+            PropertyScope.put(field.get(null).toString(), confScope.value());
+          } catch (IllegalArgumentException e) {
+            throw new RuntimeException(e);
+          } catch (IllegalAccessException e) {
+            throw new RuntimeException(e);
+          }
+        } else {
+          throw new RuntimeException(field.getName() + " is not String type, should not been annotated with "
+              + ConfigurationScope.class.getSimpleName());
+        }
+      }
+    }
   }
 
   public TezConfiguration() {
@@ -89,6 +121,7 @@ public class TezConfiguration extends Configuration {
    * Boolean value. If true then Tez will try to automatically delete temporary job 
    * artifacts that it creates within the specified staging dir. Does not affect any user data.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE = TEZ_AM_PREFIX +
       "staging.scratch-data.auto-delete";
   public static final boolean TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE_DEFAULT = true;
@@ -96,6 +129,7 @@ public class TezConfiguration extends Configuration {
   /**
    * String value. Specifies a directory where Tez can create temporary job artifacts.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_STAGING_DIR = TEZ_PREFIX + "staging-dir";
   public static final String TEZ_AM_STAGING_DIR_DEFAULT = "/tmp/"
       + System.getProperty("user.name") + "/tez/staging";
@@ -104,6 +138,7 @@ public class TezConfiguration extends Configuration {
    * String value that is a file path.
    * Path to a credentials file (with serialized credentials) located on the local file system.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_CREDENTIALS_PATH = TEZ_PREFIX + "credentials.path";
 
   /**
@@ -114,6 +149,7 @@ public class TezConfiguration extends Configuration {
    * by the same user. For long running applications, one-off executions, batch jobs etc non-session 
    * mode is recommended. If session mode is enabled then container reuse is recommended.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_SESSION_MODE = TEZ_AM_PREFIX + "mode.session";
   public static final boolean TEZ_AM_SESSION_MODE_DEFAULT = false;
 
@@ -133,6 +169,7 @@ public class TezConfiguration extends Configuration {
    *   DEBUG;org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO is valid
    *   org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO is not valid
    * */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_LOG_LEVEL = TEZ_AM_PREFIX + "log.level";
   public static final String TEZ_AM_LOG_LEVEL_DEFAULT = "INFO";
 
@@ -152,6 +189,7 @@ public class TezConfiguration extends Configuration {
    *   DEBUG;org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO is valid
    *   org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO is not valid
    * */
+  @ConfigurationScope(Scope.VERTEX)
   public static final String TEZ_TASK_LOG_LEVEL = TEZ_TASK_PREFIX + "log.level";
   public static final String TEZ_TASK_LOG_LEVEL_DEFAULT = "INFO";
 
@@ -163,6 +201,7 @@ public class TezConfiguration extends Configuration {
    * vertex succeeds. Depending on the desired output visibility and downstream consumer dependencies
    * this value must be appropriately chosen. Defaults to the safe choice of true.
    */
+  @ConfigurationScope(Scope.DAG)
   public static final String TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS =
       TEZ_AM_PREFIX + "commit-all-outputs-on-dag-success";
   public static final boolean TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT = true;
@@ -173,6 +212,7 @@ public class TezConfiguration extends Configuration {
    * include default options meant to be used by all jobs in a cluster. If required, the values can
    * be overridden per job.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS =
       TEZ_AM_PREFIX + "launch.cluster-default.cmd-opts";
   public static final String TEZ_AM_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS_DEFAULT =
@@ -183,6 +223,7 @@ public class TezConfiguration extends Configuration {
    * AppMaster process. Its recommended to not set any Xmx or Xms in these launch opts so that
    * Tez can determine them automatically.
    * */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_LAUNCH_CMD_OPTS = TEZ_AM_PREFIX +  "launch.cmd-opts";
   public static final String TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT = 
       "-XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps -XX:+UseNUMA -XX:+UseParallelGC";
@@ -193,6 +234,7 @@ public class TezConfiguration extends Configuration {
    * include default options meant to be used by all jobs in a cluster. If required, the values can
    * be overridden per job.
    */
+  @ConfigurationScope(Scope.AM) // TODO DAG/Vertex level
   public static final String TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS =
       TEZ_TASK_PREFIX + "launch.cluster-default.cmd-opts";
   public static final String TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS_DEFAULT =
@@ -203,6 +245,7 @@ public class TezConfiguration extends Configuration {
    * processes. Its recommended to not set any Xmx or Xms in these launch opts
    * so that Tez can determine them automatically.
    */
+  @ConfigurationScope(Scope.AM) // TODO DAG/Vertex level
   public static final String TEZ_TASK_LAUNCH_CMD_OPTS = TEZ_TASK_PREFIX
       + "launch.cmd-opts";
   public static final String TEZ_TASK_LAUNCH_CMD_OPTS_DEFAULT =
@@ -217,6 +260,7 @@ public class TezConfiguration extends Configuration {
    * fraction that is applied to the memory allocated Factor to size Xmx based
    * on container memory size. Value should be greater than 0 and less than 1.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION =
       TEZ_PREFIX + "container.max.java.heap.fraction";
   public static final double TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION_DEFAULT = 0.8;
@@ -232,6 +276,7 @@ public class TezConfiguration extends Configuration {
    * These take least precedence compared to other methods of setting env.
    * These get added to the app master environment prior to launching it.
   */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_LAUNCH_ENV = TEZ_AM_PREFIX
       + "launch.env";
   public static final String TEZ_AM_LAUNCH_ENV_DEFAULT = NATIVE_LIB_PARAM_DEFAULT;
@@ -243,11 +288,13 @@ public class TezConfiguration extends Configuration {
    * These take least precedence compared to other methods of setting env
    * These get added to the task environment prior to launching it.
    */
+  @ConfigurationScope(Scope.VERTEX)
   public static final String TEZ_TASK_LAUNCH_ENV = TEZ_TASK_PREFIX
       + "launch.env";
   public static final String TEZ_TASK_LAUNCH_ENV_DEFAULT = NATIVE_LIB_PARAM_DEFAULT;
 
   @Private
+  @ConfigurationScope(Scope.DAG)
   public static final String TEZ_CANCEL_DELEGATION_TOKENS_ON_COMPLETION = TEZ_PREFIX +
       "cancel.delegation.tokens.on.completion";
   public static final boolean TEZ_CANCEL_DELEGATION_TOKENS_ON_COMPLETION_DEFAULT = true;
@@ -256,6 +303,7 @@ public class TezConfiguration extends Configuration {
    * Int value. The number of threads used to listen to task heartbeat requests.
    * Expert level setting.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_TASK_LISTENER_THREAD_COUNT =
       TEZ_AM_PREFIX + "task.listener.thread-count";
   public static final int TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT = 30;
@@ -267,6 +315,7 @@ public class TezConfiguration extends Configuration {
    * counters. Expert level setting.
    */
   @Unstable
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_COUNTERS_MAX = TEZ_PREFIX + "counters.max";
   public static final int TEZ_COUNTERS_MAX_DEFAULT = 1200;
 
@@ -276,6 +325,7 @@ public class TezConfiguration extends Configuration {
    * counters. Expert level setting.
    */
   @Unstable
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_COUNTERS_MAX_GROUPS = TEZ_PREFIX + "counters.max.groups";
   public static final int TEZ_COUNTERS_MAX_GROUPS_DEFAULT = 500;
 
@@ -285,6 +335,7 @@ public class TezConfiguration extends Configuration {
    * counters. Expert level setting.
    */
   @Unstable
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH =
       TEZ_PREFIX + "counters.counter-name.max-length";
   public static final int TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH_DEFAULT = 64;
@@ -295,6 +346,7 @@ public class TezConfiguration extends Configuration {
    * counters. Expert level setting.
    */
   @Unstable
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH =
       TEZ_PREFIX + "counters.group-name.max-length";
   public static final int TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH_DEFAULT = 128;
@@ -304,6 +356,7 @@ public class TezConfiguration extends Configuration {
    * Boolean value. Enable speculative execution of slower tasks. This can help reduce job latency 
    * when some tasks are running slower due bad/slow machines
    */
+  @ConfigurationScope(Scope.VERTEX)  // TODO Verify the vertex speculation, TEZ-1788
   public static final String TEZ_AM_SPECULATION_ENABLED = TEZ_AM_PREFIX + "speculation.enabled";
   public static final boolean TEZ_AM_SPECULATION_ENABLED_DEFAULT = false;
   
@@ -312,6 +365,7 @@ public class TezConfiguration extends Configuration {
    * should be considered as an outlier/slow task.
    */
   @Unstable
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_LEGACY_SPECULATIVE_SLOWTASK_THRESHOLD = 
                                      TEZ_AM_PREFIX + "legacy.speculative.slowtask.threshold";
 
@@ -319,6 +373,7 @@ public class TezConfiguration extends Configuration {
    * Int value. Upper limit on the number of threads user to launch containers in the app
    * master. Expert level setting. 
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT =
     TEZ_AM_PREFIX + "containerlauncher.thread-count-limit";
 
@@ -329,6 +384,7 @@ public class TezConfiguration extends Configuration {
   /**
    * Int value. Specifies the number of task failures on a node before the node is considered faulty.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_MAX_TASK_FAILURES_PER_NODE = TEZ_AM_PREFIX
       + "maxtaskfailures.per.node";
   public static final int TEZ_AM_MAX_TASK_FAILURES_PER_NODE_DEFAULT = 10;
@@ -339,6 +395,7 @@ public class TezConfiguration extends Configuration {
    * is for cases where the app master is not at fault but is lost due to system errors.
    * Expert level setting.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_MAX_APP_ATTEMPTS = TEZ_AM_PREFIX + 
       "max.app.attempts";
   public static final int TEZ_AM_MAX_APP_ATTEMPTS_DEFAULT = 2;
@@ -347,6 +404,7 @@ public class TezConfiguration extends Configuration {
    * Int value. The maximum number of attempts that can fail for a particular task before the task is failed. 
    * This does not count killed attempts. Task failure results in DAG failure.
    */
+  @ConfigurationScope(Scope.VERTEX)
   public static final String TEZ_AM_TASK_MAX_FAILED_ATTEMPTS =
       TEZ_AM_PREFIX + "task.max.failed.attempts";
   public static final int TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT = 4;
@@ -355,6 +413,7 @@ public class TezConfiguration extends Configuration {
    * Boolean value. Enabled blacklisting of nodes of nodes that are considered faulty. These nodes 
    * will not be used to execute tasks.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_NODE_BLACKLISTING_ENABLED = TEZ_AM_PREFIX
       + "node-blacklisting.enabled";
   public static final boolean TEZ_AM_NODE_BLACKLISTING_ENABLED_DEFAULT = true;
@@ -364,11 +423,13 @@ public class TezConfiguration extends Configuration {
    * This limits the number of nodes that are blacklisted in an effort to minimize the effects of 
    * temporary surges in failures (e.g. due to network outages). 
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD = TEZ_AM_PREFIX
       + "node-blacklisting.ignore-threshold-node-percent";
   public static final int TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT = 33;
 
   /** Int value. Number of threads to handle client RPC requests. Expert level setting.*/
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_CLIENT_THREAD_COUNT =
       TEZ_AM_PREFIX + "client.am.thread-count";
   public static final int TEZ_AM_CLIENT_THREAD_COUNT_DEFAULT = 1;
@@ -377,22 +438,26 @@ public class TezConfiguration extends Configuration {
    * String value. Range of ports that the AM can use when binding for client connections. Leave blank
    * to use all possible ports. Expert level setting.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_CLIENT_AM_PORT_RANGE =
       TEZ_AM_PREFIX + "client.am.port-range";
 
   /**
    * String value. The class to be used for DAG Scheduling. Expert level setting.
    */
+  @ConfigurationScope(Scope.DAG)
   public static final String TEZ_AM_DAG_SCHEDULER_CLASS = TEZ_AM_PREFIX + "dag.scheduler.class";
   public static final String TEZ_AM_DAG_SCHEDULER_CLASS_DEFAULT =
       "org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrder";
 
   /** Int value. The amount of memory in MB to be used by the AppMaster */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_RESOURCE_MEMORY_MB = TEZ_AM_PREFIX
       + "resource.memory.mb";
   public static final int TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT = 1024;
 
   /** Int value. The number of virtual cores to be used by the app master */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_RESOURCE_CPU_VCORES = TEZ_AM_PREFIX
       + "resource.cpu.vcores";
   public static final int TEZ_AM_RESOURCE_CPU_VCORES_DEFAULT = 1;
@@ -400,6 +465,7 @@ public class TezConfiguration extends Configuration {
   /** Int value. The amount of memory in MB to be used by tasks. This applies to all tasks across
    * all vertices. Setting it to the same value for all tasks is helpful for container reuse and 
    * thus good for performance typically. */
+  @ConfigurationScope(Scope.DAG)  // TODO vertex level
   public static final String TEZ_TASK_RESOURCE_MEMORY_MB = TEZ_TASK_PREFIX
       + "resource.memory.mb";
   public static final int TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT = 1024;
@@ -407,6 +473,7 @@ public class TezConfiguration extends Configuration {
   /**
    * Int value. The number of virtual cores to be used by tasks.
    */
+  @ConfigurationScope(Scope.DAG)  // TODO vertex level
   public static final String TEZ_TASK_RESOURCE_CPU_VCORES = TEZ_TASK_PREFIX
       + "resource.cpu.vcores";
   public static final int TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT = 1; 
@@ -416,6 +483,7 @@ public class TezConfiguration extends Configuration {
    * Increasing this reduces the communication between the AM and the RM and can
    * help in scaling up. Expert level setting.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX = TEZ_AM_PREFIX
       + "am-rm.heartbeat.interval-ms.max";
   public static final int TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX_DEFAULT = 1000;
@@ -425,6 +493,7 @@ public class TezConfiguration extends Configuration {
    * AM for another task. Increasing this can help improve app master scalability for a large 
    * number of concurrent tasks. Expert level setting.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX = TEZ_TASK_PREFIX
       + "get-task.sleep.interval-ms.max";
   public static final int TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT = 200;
@@ -434,6 +503,7 @@ public class TezConfiguration extends Configuration {
    * Increasing this can help improve app master scalability for a large number of concurrent tasks.
    * Expert level setting.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS = TEZ_TASK_PREFIX
       + "am.heartbeat.interval-ms.max";
   public static final int TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 100;
@@ -443,6 +513,7 @@ public class TezConfiguration extends Configuration {
    * tasks. This reduces the amount of network traffice between AM and tasks to send high-volume 
    * counters. Improves AM scalability. Expert level setting.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS = TEZ_TASK_PREFIX
       + "am.heartbeat.counter.interval-ms.max";
   public static final int TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT =
@@ -452,6 +523,7 @@ public class TezConfiguration extends Configuration {
    * Int value. Maximum number of of events to fetch from the AM by the tasks in a single heartbeat.
    * Expert level setting. Expert level setting.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT = TEZ_TASK_PREFIX
       + "max-events-per-heartbeat";
   public static final int TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT = 500;
@@ -463,6 +535,7 @@ public class TezConfiguration extends Configuration {
    */
   @Unstable
   @Private
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_TASK_GENERATE_COUNTERS_PER_IO = TEZ_TASK_PREFIX
       + "generate.counters.per.io";
   @Private
@@ -473,6 +546,7 @@ public class TezConfiguration extends Configuration {
    * before its considered lost.
    * Expert level setting.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TASK_HEARTBEAT_TIMEOUT_MS = TEZ_TASK_PREFIX + "timeout-ms";
 
   public static final int TASK_HEARTBEAT_TIMEOUT_MS_DEFAULT = 5 * 60 * 1000;
@@ -481,6 +555,7 @@ public class TezConfiguration extends Configuration {
    * Int value. Time interval, in milliseconds, between checks for lost tasks.
    * Expert level setting.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TASK_HEARTBEAT_TIMEOUT_CHECK_MS = TEZ_TASK_PREFIX + "heartbeat.timeout.check-ms";
 
   public static final int TASK_HEARTBEAT_TIMEOUT_CHECK_MS_DEFAULT = 30 * 1000;
@@ -491,6 +566,7 @@ public class TezConfiguration extends Configuration {
    */
   @Private
   @Unstable
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_TASK_SCALE_MEMORY_ENABLED = TEZ_TASK_PREFIX
       + "scale.memory.enabled";
   @Private
@@ -501,6 +577,7 @@ public class TezConfiguration extends Configuration {
    */
   @Private
   @Unstable
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS = TEZ_TASK_PREFIX
       + "scale.memory.allocator.class";
   @Private
@@ -513,6 +590,7 @@ public class TezConfiguration extends Configuration {
    */
   @Private
   @Unstable
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION = TEZ_TASK_PREFIX
       + "scale.memory.reserve-fraction";
   @Private
@@ -524,6 +602,7 @@ public class TezConfiguration extends Configuration {
    */
   @Private
   @Unstable
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_PER_IO =
       TEZ_TASK_PREFIX + "scale.memory.additional-reservation.fraction.per-io";
 
@@ -532,6 +611,7 @@ public class TezConfiguration extends Configuration {
   /**
    * Max cumulative total reservation for additional IOs.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_MAX =
       TEZ_TASK_PREFIX + "scale.memory.additional-reservation.fraction.max";
   /*
@@ -541,6 +621,7 @@ public class TezConfiguration extends Configuration {
    */
   @Private
   @Unstable
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS =
       TEZ_TASK_PREFIX + "scale.memory.ratios";
 
@@ -549,6 +630,7 @@ public class TezConfiguration extends Configuration {
   /**
    * Defines the ProcessTree implementation which will be used to collect resource utilization.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_TASK_RESOURCE_CALCULATOR_PROCESS_TREE_CLASS =
       TEZ_TASK_PREFIX + "resource.calculator.process-tree.class";
 
@@ -557,6 +639,7 @@ public class TezConfiguration extends Configuration {
    * Boolean value. Configuration to specify whether container should be reused across tasks.
    * This improves performance by not incurring recurring launch overheads.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_CONTAINER_REUSE_ENABLED = TEZ_AM_PREFIX
       + "container.reuse.enabled";
   public static final boolean TEZ_AM_CONTAINER_REUSE_ENABLED_DEFAULT = true;
@@ -565,6 +648,7 @@ public class TezConfiguration extends Configuration {
    * Boolean value. Whether to reuse containers for rack local tasks. Active only if reuse is
    * enabled.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED =
       TEZ_AM_PREFIX + "container.reuse.rack-fallback.enabled";
   public static final boolean
@@ -575,6 +659,7 @@ public class TezConfiguration extends Configuration {
    * enabled. Turning this on can severely affect locality and can be bad for jobs with high data 
    * volume being read from the primary data sources.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED =
       TEZ_AM_PREFIX + "container.reuse.non-local-fallback.enabled";
   public static final boolean
@@ -584,6 +669,7 @@ public class TezConfiguration extends Configuration {
    * Int value. The amount of time to wait before assigning a container to the next level
    * of locality. NODE -> RACK -> NON_LOCAL. Delay scheduling parameter. Expert level setting.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String
       TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS =
       TEZ_AM_PREFIX + "container.reuse.locality.delay-allocation-millis";
@@ -594,6 +680,7 @@ public class TezConfiguration extends Configuration {
    * Int value. The minimum amount of time to hold on to a container that is idle. Only active when 
    * reuse is enabled. Set to -1 to never release idle containers (not recommended). 
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS = 
     TEZ_AM_PREFIX + "container.idle.release-timeout-min.millis";
   public static final long
@@ -609,6 +696,7 @@ public class TezConfiguration extends Configuration {
    * TezConfiguration#TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS. This 
    * creates a graceful reduction in the amount of idle resources held
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS = 
       TEZ_AM_PREFIX + "container.idle.release-timeout-max.millis";
   public static final long
@@ -619,6 +707,7 @@ public class TezConfiguration extends Configuration {
    * non-session mode. Enables an idle session (not running any DAG) to hold on to a minimum number
    * of containers to provide fast response times for the next DAG.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_SESSION_MIN_HELD_CONTAINERS = 
       TEZ_AM_PREFIX + "session.min.held-containers";
   public static final int TEZ_AM_SESSION_MIN_HELD_CONTAINERS_DEFAULT = 0;
@@ -632,6 +721,7 @@ public class TezConfiguration extends Configuration {
    * the cost of losing work. Setting to 0 turns off preemption. Expert level
    * setting.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_PREEMPTION_PERCENTAGE = 
       TEZ_AM_PREFIX + "preemption.percentage";
   public static final int TEZ_AM_PREEMPTION_PERCENTAGE_DEFAULT = 10;
@@ -641,6 +731,7 @@ public class TezConfiguration extends Configuration {
    * more running tasks. After preempting a task, we need to wait at least 1 heartbeat so that the 
    * RM can act on the released resources and assign new ones to us. Expert level setting.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS = 
       TEZ_AM_PREFIX + "preemption.heartbeats-between-preemptions";
   public static final int TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS_DEFAULT = 3;
@@ -666,6 +757,7 @@ public class TezConfiguration extends Configuration {
    * </ul>
    * </ol>
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_LIB_URIS = TEZ_PREFIX + "lib.uris";
 
   /**
@@ -680,6 +772,7 @@ public class TezConfiguration extends Configuration {
    * All duplicate resources are ignored.
    *
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AUX_URIS = TEZ_PREFIX + "aux.uris";
 
   /**
@@ -687,6 +780,7 @@ public class TezConfiguration extends Configuration {
    * raw Tez application where classpath is propagated with application
    * via {@link LocalResource}s. This is mainly useful for developer/debugger scenarios.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_IGNORE_LIB_URIS = TEZ_PREFIX + "ignore.lib.uris";
 
   /**
@@ -695,6 +789,7 @@ public class TezConfiguration extends Configuration {
    * This is disabled by default - with the expectation being that tez.lib.uris has a complete
    * tez-deployment which contains the hadoop libraries.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_USE_CLUSTER_HADOOP_LIBS = TEZ_PREFIX + "use.cluster.hadoop-libs";
   public static final boolean TEZ_USE_CLUSTER_HADOOP_LIBS_DEFAULT = false;
 
@@ -705,6 +800,7 @@ public class TezConfiguration extends Configuration {
    * This will be prepended to the classpath before all framework specific components have been
    * specified.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_CLUSTER_ADDITIONAL_CLASSPATH_PREFIX =
       TEZ_PREFIX + "cluster.additional.classpath.prefix";
 
@@ -721,6 +817,7 @@ public class TezConfiguration extends Configuration {
    * AM then this timeout may be hit. In those case, using non-session mode is recommended if 
    * applicable. Otherwise increase the timeout (set to -1 for infinity. Not recommended)
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_SESSION_CLIENT_TIMEOUT_SECS =
       TEZ_SESSION_PREFIX + "client.timeout.secs";
   public static final int TEZ_SESSION_CLIENT_TIMEOUT_SECS_DEFAULT =
@@ -730,6 +827,7 @@ public class TezConfiguration extends Configuration {
    * Int value. Time (in seconds) for which the Tez AM should wait for a DAG to be submitted before
    * shutting down. Only relevant in session mode.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS =
       TEZ_SESSION_PREFIX + "am.dag.submit.timeout.secs";
   public static final int TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT =
@@ -738,12 +836,14 @@ public class TezConfiguration extends Configuration {
   /**
    * String value. The queue name for all jobs being submitted from a given client.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_QUEUE_NAME = TEZ_PREFIX + "queue.name";
 
   @Unstable
   /**
    * Boolean value. Generate debug artifacts like DAG plan in text format.
    */
+  @ConfigurationScope(Scope.DAG)
   public static final String TEZ_GENERATE_DEBUG_ARTIFACTS =
       TEZ_PREFIX + "generate.debug.artifacts";
   public static final boolean TEZ_GENERATE_DEBUG_ARTIFACTS_DEFAULT = false;
@@ -760,6 +860,7 @@ public class TezConfiguration extends Configuration {
    * v[] - Additional launch-cmd options for all tasks in vertex v
    */
   @Unstable
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_TASK_SPECIFIC_LAUNCH_CMD_OPTS_LIST = TEZ_PREFIX + "task-specific" +
       ".launch.cmd-opts.list";
 
@@ -771,6 +872,7 @@ public class TezConfiguration extends Configuration {
    * "-agentpath:libpagent.so,dir=/tmp/__VERTEX_NAME__/__TASK_INDEX__"
    */
   @Unstable
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_TASK_SPECIFIC_LAUNCH_CMD_OPTS = TEZ_PREFIX + "task-specific" +
       ".launch.cmd-opts";
 
@@ -791,12 +893,14 @@ public class TezConfiguration extends Configuration {
    *   org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO is not valid
    * */
   @Unstable
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_TASK_SPECIFIC_LOG_LEVEL = TEZ_PREFIX + "task-specific" + ".log.level";
 
   /**
    * String value that is a class name.
    * Specify the class to use for logging history data
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_HISTORY_LOGGING_SERVICE_CLASS =
       TEZ_PREFIX + "history.logging.service.class";
 
@@ -808,6 +912,7 @@ public class TezConfiguration extends Configuration {
    * container logging directory. This is relevant only when SimpleHistoryLoggingService is being
    * used for {@link TezConfiguration#TEZ_HISTORY_LOGGING_SERVICE_CLASS}
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_SIMPLE_HISTORY_LOGGING_DIR =
       TEZ_PREFIX + "simple.history.logging.dir";
   
@@ -815,6 +920,7 @@ public class TezConfiguration extends Configuration {
    * Int value. Maximum errors allowed while logging history data. After crossing this limit history
    * logging gets disabled. The job continues to run after this.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_SIMPLE_HISTORY_LOGGING_MAX_ERRORS =
       TEZ_PREFIX + "simple.history.max.errors";
   public static final int TEZ_SIMPLE_HISTORY_LOGGING_MAX_ERRORS_DEFAULT = 10;
@@ -823,6 +929,7 @@ public class TezConfiguration extends Configuration {
    * Int value. Time, in milliseconds, to wait while flushing YARN ATS data during shutdown.
    * Expert level setting.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS =
       TEZ_PREFIX + "yarn.ats.event.flush.timeout.millis";
   public static final long YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT =
@@ -832,6 +939,7 @@ public class TezConfiguration extends Configuration {
    * Int value. Max no. of events to send in a single batch to ATS.
    * Expert level setting.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String YARN_ATS_MAX_EVENTS_PER_BATCH =
       TEZ_PREFIX + "yarn.ats.max.events.per.batch";
   public static final int YARN_ATS_MAX_EVENTS_PER_BATCH_DEFAULT = 5;
@@ -841,17 +949,20 @@ public class TezConfiguration extends Configuration {
    * Int value. Time, in milliseconds, to wait for an event before sending a batch to ATS.
    * Expert level setting.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String YARN_ATS_MAX_POLLING_TIME_PER_EVENT = TEZ_PREFIX
       + "yarn.ats.max.polling.time.per.event.millis";
   public static final int YARN_ATS_MAX_POLLING_TIME_PER_EVENT_DEFAULT = 10;
 
-
+  @ConfigurationScope(Scope.AM)
   public static final String YARN_ATS_ACL_DOMAINS_AUTO_CREATE = TEZ_PREFIX
       + "yarn.ats.acl.domains.auto-create";
   public static final boolean YARN_ATS_ACL_DOMAINS_AUTO_CREATE_DEFAULT = true;
 
+  @ConfigurationScope(Scope.AM)
   public static final String YARN_ATS_ACL_SESSION_DOMAIN_ID = TEZ_PREFIX
       + "yarn.ats.acl.session.domain.id";
+  @ConfigurationScope(Scope.DAG)
   public static final String YARN_ATS_ACL_DAG_DOMAIN_ID = TEZ_PREFIX
       + "yarn.ats.acl.dag.domain.id";
 
@@ -859,6 +970,7 @@ public class TezConfiguration extends Configuration {
    * Boolean value. Enable recovery of DAGs. This allows a restarted app master to recover the 
    * incomplete DAGs from the previous instance of the app master.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String DAG_RECOVERY_ENABLED =
       TEZ_PREFIX + "dag.recovery.enabled";
   public static final boolean DAG_RECOVERY_ENABLED_DEFAULT = true;
@@ -867,6 +979,7 @@ public class TezConfiguration extends Configuration {
    * Int value. Size in bytes for the IO buffer size while processing the recovery file.
    * Expert level setting.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String DAG_RECOVERY_FILE_IO_BUFFER_SIZE =
       TEZ_PREFIX + "dag.recovery.io.buffer.size";
   public static final int DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT = 8192;
@@ -874,6 +987,7 @@ public class TezConfiguration extends Configuration {
   /**
    * Int value. Number of recovery events to buffer before flushing them to the recovery log.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String DAG_RECOVERY_MAX_UNFLUSHED_EVENTS =
       TEZ_PREFIX + "dag.recovery.max.unflushed.events";
   public static final int DAG_RECOVERY_MAX_UNFLUSHED_EVENTS_DEFAULT = 100;
@@ -881,6 +995,7 @@ public class TezConfiguration extends Configuration {
   /**
    * Int value. Interval, in seconds, between flushing recovery data to the recovery log.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String DAG_RECOVERY_FLUSH_INTERVAL_SECS =
       TEZ_PREFIX + "dag.recovery.flush.interval.secs";
   public static final int DAG_RECOVERY_FLUSH_INTERVAL_SECS_DEFAULT = 30;
@@ -889,6 +1004,7 @@ public class TezConfiguration extends Configuration {
    *  Boolean value. Enable local mode execution in Tez. Enables tasks to run in the same process as
    *  the app master. Primarily used for debugging.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_LOCAL_MODE =
     TEZ_PREFIX + "local.mode";
 
@@ -898,6 +1014,7 @@ public class TezConfiguration extends Configuration {
    *  Tez AM Inline Mode flag. Not valid till Tez-684 get checked-in
    */
   @Private
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_INLINE_TASK_EXECUTION_ENABLED =
     TEZ_AM_PREFIX + "inline.task.execution.enabled";
 
@@ -911,6 +1028,7 @@ public class TezConfiguration extends Configuration {
    * Int value.
    * The maximium number of tasks running in parallel within the app master process.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS =
     TEZ_AM_PREFIX + "inline.task.execution.max-tasks";
 
@@ -935,6 +1053,7 @@ public class TezConfiguration extends Configuration {
   /**
    * Boolean value. Configuration to enable/disable ACL checks.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_ACLS_ENABLED = TEZ_AM_PREFIX + "acls.enabled";
   public static final boolean TEZ_AM_ACLS_ENABLED_DEFAULT = true;
 
@@ -945,6 +1064,7 @@ public class TezConfiguration extends Configuration {
    * Comma separated list of users, followed by whitespace, followed by a comma separated list of 
    * groups
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_VIEW_ACLS = TEZ_AM_PREFIX + "view-acls";
 
   /**
@@ -954,12 +1074,14 @@ public class TezConfiguration extends Configuration {
    * Comma separated list of users, followed by whitespace, followed by a comma separated list of 
    * groups
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_MODIFY_ACLS = TEZ_AM_PREFIX + "modify-acls";
 
   /**
    * Boolean value.
    * Disable version check between client and AM/DAG. Default false.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_DISABLE_CLIENT_VERSION_CHECK = TEZ_AM_PREFIX
       + "disable.client-version-check";
   public static final boolean TEZ_AM_DISABLE_CLIENT_VERSION_CHECK_DEFAULT = false;
@@ -968,6 +1090,7 @@ public class TezConfiguration extends Configuration {
    * Boolean value.
    * Allow disabling of Timeline Domains even if Timeline is being used.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS = TEZ_PREFIX
       + "allow.disabled.timeline-domains";
   public static final boolean TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT = false;
@@ -986,6 +1109,7 @@ public class TezConfiguration extends Configuration {
    *   For example, "http://uihost:9001/#/tez-app/__APPLICATION_ID__/ will be replaced to
    *   http://uihost:9001/#/tez-app/application_1421880306565_0001/
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE = TEZ_AM_PREFIX
       + "tez-ui.history-url.template";
   public static final String TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE_DEFAULT =
@@ -998,6 +1122,7 @@ public class TezConfiguration extends Configuration {
    * if the ui is hosted on the default port (80 for http and 443 for https), the port should not
    * be specified.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_HISTORY_URL_BASE = TEZ_PREFIX
       + "tez-ui.history-url.base";
 
@@ -1006,7 +1131,28 @@ public class TezConfiguration extends Configuration {
    * Allow disabling of the Tez AM webservice. If set to false the Tez-UI wont show progress
    * updates for running application.
    */
+  @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_WEBSERVICE_ENABLE = TEZ_AM_PREFIX
       + "tez-ui.webservice.enable";
   public static final boolean TEZ_AM_WEBSERVICE_ENABLE_DEFAULT = true;
+
+  // TODO only validate property here, value can also be validated if necessary
+  public static void validateProperty(String property, Scope usedScope) {
+    Scope validScope = PropertyScope.get(property);
+    if (validScope == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(property + " is not standard configuration property of tez, can not been validated");
+      }
+    } else {
+      if (usedScope.ordinal() > validScope.ordinal()) {
+        throw new IllegalStateException(property + " is set at the scope of " + usedScope
+            + ", but it is only valid in the scope of " + validScope);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  static Set<String> getPropertySet() {
+    return PropertyScope.keySet();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 91d8cdf..c8d3df7 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -55,6 +55,7 @@ public class Vertex {
   private Resource taskResource;
   private final Map<String, LocalResource> taskLocalResources = new HashMap<String, LocalResource>();
   private Map<String, String> taskEnvironment = new HashMap<String, String>();
+  private Map<String, String> vertexConf = new HashMap<String, String>();
   private final List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs 
                       = new ArrayList<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>();
   private final List<RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> additionalOutputs 
@@ -288,6 +289,10 @@ public class Vertex {
     return taskEnvironment;
   }
 
+  public Map<String, String> getConf() {
+    return vertexConf;
+  }
+
   /**
    * Set the command opts for tasks of this vertex. This method should be used 
    * when different vertices have different opts. Else, set the launch opts for '
@@ -386,6 +391,12 @@ public class Vertex {
 	  return taskLaunchCmdOpts;
   }
 
+  public Vertex setConf(String property, String value) {
+    TezConfiguration.validateProperty(property, Scope.VERTEX);
+    this.vertexConf.put(property, value);
+    return this;
+  }
+
   @Override
   public String toString() {
     return "[" + vertexName + " : " + processorDescriptor.getClassName() + "]";

http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index 0539405..dbd569a 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -138,6 +138,7 @@ message VertexPlan {
   repeated RootInputLeafOutputProto inputs = 8;
   repeated RootInputLeafOutputProto outputs = 9;
   optional TezEntityDescriptorProto vertex_manager_plugin = 10;
+  optional ConfigurationProto vertexConf = 11;
 }
 
 message EdgePlan {
@@ -160,7 +161,7 @@ message DAGPlan {
   required string name = 1;
   repeated VertexPlan vertex = 2;
   repeated EdgePlan edge = 3;
-  optional ConfigurationProto dagKeyValues = 4;
+  optional ConfigurationProto dagConf = 4;
   optional bytes credentials_binary = 5;
   repeated PlanVertexGroupInfo vertex_groups = 6;
   repeated PlanLocalResource local_resource = 7;

http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
index dd9feff..8e7f80b 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
@@ -136,4 +136,54 @@ public class TestDAG {
       Assert.assertTrue(e.getMessage().contains("already defined"));
     }
   }
+
+  @Test(timeout = 5000)
+  public void testDAGConf() {
+    DAG dag = DAG.create("dag1");
+    // it's OK to set custom configuration
+    dag.setConf("unknown_conf", "value");
+
+    // set invalid AM level configuration
+    try {
+      dag.setConf(TezConfiguration.TEZ_AM_SESSION_MODE, true+"");
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertEquals("tez.am.mode.session is set at the scope of DAG,"
+          + " but it is only valid in the scope of AM",
+          e.getMessage());
+    }
+    // set valid DAG level configuration
+    dag.setConf(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false + "");
+    // set valid Vertex level configuration
+    dag.setConf(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 3 + "");
+  }
+
+  @Test(timeout = 5000)
+  public void testVertexConf() {
+    Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("dummyProcessor"));
+    // it's OK to set custom property
+    v1.setConf("unknown_conf", "value");
+
+    // set invalid AM level configuration
+    try {
+      v1.setConf(TezConfiguration.TEZ_AM_SESSION_MODE, true+"");
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertEquals("tez.am.mode.session is set at the scope of VERTEX,"
+          + " but it is only valid in the scope of AM",
+          e.getMessage());
+    }
+
+    // set invalid DAG level configuration
+    try {
+      v1.setConf(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false + "");
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertEquals("tez.am.commit-all-outputs-on-dag-success is set at the scope of VERTEX,"
+          + " but it is only valid in the scope of DAG",
+          e.getMessage());
+    }
+    // set valid Vertex level configuration
+    v1.setConf(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 3 + "");
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index 74e780c..1964100 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -1044,7 +1044,7 @@ public class TestDAGVerify {
     Assert.assertNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
     Assert.assertNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
 
-    ConfigurationProto confProto = dagPlan.getDagKeyValues();
+    ConfigurationProto confProto = dagPlan.getDagConf();
     boolean foundViewAcls = false;
     boolean foundModifyAcls = false;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-api/src/test/java/org/apache/tez/dag/api/TestTezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestTezConfiguration.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestTezConfiguration.java
index 9ec4853..4423f16 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestTezConfiguration.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestTezConfiguration.java
@@ -18,6 +18,14 @@
 
 package org.apache.tez.dag.api;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.junit.Assert;
 import org.junit.Test;
@@ -44,4 +52,28 @@ public class TestTezConfiguration {
     Assert.assertNull(tezConf5.get(TezConfiguration.TEZ_LIB_URIS));
   }
 
+  @Test(timeout = 5000)
+  public void testKeySet() throws IllegalAccessException {
+    Class<?> c = TezConfiguration.class;
+    Set<String> expectedKeys = new HashSet<String>();
+    for (Field f : c.getFields()) {
+      if (!f.getName().endsWith("DEFAULT") && f.getType() == String.class) {
+        String value = (String)f.get(null);
+        // not prefix
+        if (!value.endsWith(".")) {
+          expectedKeys.add((String) f.get(null));
+          Assert.assertNotNull("field " + f.getName() + " do not have annotation of ConfigurationScope.",
+              f.getAnnotation(ConfigurationScope.class));
+        }
+      }
+    }
+
+    Set<String> actualKeySet = TezConfiguration.getPropertySet();
+    for (String key : actualKeySet) {
+      if (!expectedKeys.remove(key)) {
+        fail("Found unexpected key: " + key + " in key set");
+      }
+    }
+    assertTrue("Missing keys in key set: " + expectedKeys, expectedKeys.size() == 0);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 5aca3cf..8fd5626 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -766,15 +766,6 @@ public class DAGAppMaster extends AbstractService {
           dagCounter.incrementAndGet());
     }
 
-    Iterator<PlanKeyValuePair> iter =
-        dagPB.getDagKeyValues().getConfKeyValuesList().iterator();
-    Configuration dagConf = new Configuration(amConf);
-
-    while (iter.hasNext()) {
-      PlanKeyValuePair keyValPair = iter.next();
-      dagConf.set(keyValPair.getKey(), keyValPair.getValue());
-    }
-
     Credentials dagCredentials = null;
     if (dagPB.hasCredentialsBinary()) {
       dagCredentials = DagTypeConverters.convertByteStringToCredentials(dagPB
@@ -788,7 +779,7 @@ public class DAGAppMaster extends AbstractService {
 
     // create single dag
     DAGImpl newDag =
-        new DAGImpl(dagId, dagConf, dagPB, dispatcher.getEventHandler(),
+        new DAGImpl(dagId, amConf, dagPB, dispatcher.getEventHandler(),
             taskAttemptListener, dagCredentials, clock,
             appMasterUgi.getShortUserName(),
             taskHeartbeatHandler, context);

http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 74b4080..291a0c5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -24,6 +24,7 @@ import java.util.Set;
 
 import javax.annotation.Nullable;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
@@ -151,4 +152,5 @@ public interface Vertex extends Comparable<Vertex> {
 
   public int getKilledTaskAttemptCount();
 
+  public Configuration getConf();
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index aa7723b..dd38c29 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -59,6 +59,7 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.Scope;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexLocationHint;
@@ -71,6 +72,7 @@ import org.apache.tez.dag.api.client.VertexStatusBuilder;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 import org.apache.tez.dag.api.records.DAGProtos.PlanVertexGroupInfo;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
@@ -172,7 +174,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   TezCounters fullCounters = null;
   private Set<TezVertexID> reRunningVertices = new HashSet<TezVertexID>();
 
-  public final Configuration conf;
+  public final Configuration dagConf;
   private final DAGPlan jobPlan;
   
   Map<String, LocalResource> localResources;
@@ -407,7 +409,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   }
 
   public DAGImpl(TezDAGID dagId,
-      Configuration conf,
+      Configuration amConf,
       DAGPlan jobPlan,
       EventHandler eventHandler,
       TaskAttemptListener taskAttemptListener,
@@ -418,9 +420,16 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       AppContext appContext) {
     this.dagId = dagId;
     this.jobPlan = jobPlan;
-    this.conf = conf;
+    this.dagConf = new Configuration(amConf);
+    Iterator<PlanKeyValuePair> iter =
+        jobPlan.getDagConf().getConfKeyValuesList().iterator();
+    // override the amConf by using DAG level configuration
+    while (iter.hasNext()) {
+      PlanKeyValuePair keyValPair = iter.next();
+      TezConfiguration.validateProperty(keyValPair.getKey(), Scope.DAG);
+      this.dagConf.set(keyValPair.getKey(), keyValPair.getValue());
+    }
     this.dagName = (jobPlan.getName() != null) ? jobPlan.getName() : "<missing app name>";
-
     this.userName = appUserName;
     this.clock = clock;
     this.appContext = appContext;
@@ -448,9 +457,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     }
 
     this.aclManager = new ACLManager(appContext.getAMACLManager(), dagUGI.getShortUserName(),
-        this.conf);
+        this.dagConf);
 
-    this.taskSpecificLaunchCmdOption = new TaskSpecificLaunchCmdOption(conf);
+    this.taskSpecificLaunchCmdOption = new TaskSpecificLaunchCmdOption(dagConf);
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
     stateMachine = stateMachineFactory.make(this);
@@ -474,7 +483,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   // TODO maybe removed after TEZ-74
   @Override
   public Configuration getConf() {
-    return conf;
+    return dagConf;
   }
 
   @Override
@@ -1261,7 +1270,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       initTime = clock.getTime();
     }
 
-    commitAllOutputsOnSuccess = conf.getBoolean(
+    commitAllOutputsOnSuccess = dagConf.getBoolean(
         TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
         TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT);
 
@@ -1353,7 +1362,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   }
 
   private static void assignDAGScheduler(DAGImpl dag) {
-    String dagSchedulerClassName = dag.conf.get(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS,
+    String dagSchedulerClassName = dag.dagConf.get(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS,
         TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS_DEFAULT);
     LOG.info("Using DAG Scheduler: " + dagSchedulerClassName);
     dag.dagScheduler = ReflectionUtils.createClazzInstance(dagSchedulerClassName, new Class<?>[] {
@@ -1368,7 +1377,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         .convertFromDAGPlan(vertexPlan.getTaskLocationHintList());
 
     VertexImpl v = new VertexImpl(
-        vertexId, vertexPlan, vertexName, dag.conf,
+        vertexId, vertexPlan, vertexName, dag.dagConf,
         dag.eventHandler, dag.taskAttemptListener,
         dag.clock, dag.taskHeartbeatHandler,
         !dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint,

http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 05c3cc1..145170c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -71,6 +71,7 @@ import org.apache.tez.dag.api.OutputCommitterDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.RootInputLeafOutput;
+import org.apache.tez.dag.api.Scope;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexLocationHint;
@@ -85,6 +86,8 @@ import org.apache.tez.dag.api.client.VertexStatusBuilder;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.api.event.VertexStateUpdateParallelismUpdated;
 import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
@@ -211,7 +214,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private TezCounters fullCounters = null;
   private Resource taskResource;
 
-  private Configuration conf;
+  private Configuration vertexConf;
   
   private final boolean isSpeculationEnabled;
 
@@ -717,7 +720,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private final TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOpts;
 
   public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
-      String vertexName, Configuration conf, EventHandler eventHandler,
+      String vertexName, Configuration dagConf, EventHandler eventHandler,
       TaskAttemptListener taskAttemptListener, Clock clock,
       TaskHeartbeatHandler thh, boolean commitVertexOutputs,
       AppContext appContext, VertexLocationHint vertexLocationHint,
@@ -726,7 +729,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     this.vertexId = vertexId;
     this.vertexPlan = vertexPlan;
     this.vertexName = StringInterner.weakIntern(vertexName);
-    this.conf = conf;
+    this.vertexConf = new Configuration(dagConf);
+    // override dag configuration by using vertex's specified configuration
+    if (vertexPlan.hasVertexConf()) {
+      ConfigurationProto confProto = vertexPlan.getVertexConf();
+      for (PlanKeyValuePair keyValuePair : confProto.getConfKeyValuesList()) {
+        TezConfiguration.validateProperty(keyValuePair.getKey(), Scope.VERTEX);
+        vertexConf.set(keyValuePair.getKey(), keyValuePair.getValue());
+      }
+    }
     this.clock = clock;
     this.appContext = appContext;
     this.commitVertexOutputs = commitVertexOutputs;
@@ -761,7 +772,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     // Set up log properties, including task specific log properties.
     String javaOptsWithoutLoggerMods =
         vertexPlan.getTaskConfig().hasJavaOpts() ? vertexPlan.getTaskConfig().getJavaOpts() : null;
-    String logString = conf.get(TezConfiguration.TEZ_TASK_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL_DEFAULT);
+    String logString = vertexConf.get(TezConfiguration.TEZ_TASK_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL_DEFAULT);
     String [] taskLogParams = TezClientUtils.parseLogParams(logString);
     this.javaOpts = TezClientUtils.maybeAddDefaultLoggingJavaOpts(taskLogParams[0], javaOptsWithoutLoggerMods);
 
@@ -803,11 +814,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
     this.dagVertexGroups = dagVertexGroups;
     
-    isSpeculationEnabled = conf.getBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED,
+    isSpeculationEnabled = vertexConf.getBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED,
         TezConfiguration.TEZ_AM_SPECULATION_ENABLED_DEFAULT);
-    
+    LOG.info("isSpeculationEnabled:" + isSpeculationEnabled);
     if (isSpeculationEnabled()) {
-      speculator = new LegacySpeculator(conf, getAppContext(), this);
+      speculator = new LegacySpeculator(vertexConf, getAppContext(), this);
     }
     
     logIdentifier =  this.getVertexId() + " [" + this.getName() + "]";
@@ -819,6 +830,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     augmentStateMachine();
   }
 
+  @Override
+  public Configuration getConf() {
+    return vertexConf;
+  }
+
   private boolean isSpeculationEnabled() {
     return isSpeculationEnabled;
   }
@@ -2018,7 +2034,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       TaskImpl task =
           new TaskImpl(this.getVertexId(), i,
               this.eventHandler,
-              conf,
+              vertexConf,
               this.taskAttemptListener,
               this.clock,
               this.taskHeartbeatHandler,
@@ -2211,7 +2227,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         LOG.info("Setting vertexManager to ShuffleVertexManager for "
             + logIdentifier);
         // shuffle vertex manager needs a conf payload
-        vertexManager = new VertexManager(ShuffleVertexManager.createConfigBuilder(conf).build(),
+        vertexManager = new VertexManager(ShuffleVertexManager.createConfigBuilder(vertexConf).build(),
             dagUgi, this, appContext, stateChangeNotifier);
       } else {
         // schedule all tasks upon vertex start. Default behavior.

http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 599f01e..459ecad 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -64,11 +64,13 @@ import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType;
 import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataSourceType;
 import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSchedulingType;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
 import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
@@ -441,6 +443,10 @@ public class TestDAGImpl {
     LOG.info("Setting up dag plan");
     DAGPlan dag = DAGPlan.newBuilder()
         .setName("testverteximpl")
+        .setDagConf(ConfigurationProto.newBuilder()
+            .addConfKeyValues(PlanKeyValuePair.newBuilder()
+                .setKey(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS)
+                .setValue(3 + "")))
         .addVertex(
             VertexPlan.newBuilder()
             .setName("vertex1")
@@ -460,6 +466,10 @@ public class TestDAGImpl {
                 .setTaskModule("x1.y1")
                 .build()
                 )
+            .setVertexConf(ConfigurationProto.newBuilder()
+                .addConfKeyValues(PlanKeyValuePair.newBuilder()
+                    .setKey(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS)
+                    .setValue(2+"")))
             .addOutEdgeId("e1")
             .build()
             )
@@ -1117,7 +1127,7 @@ public class TestDAGImpl {
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testGroupDAGWithVertexReRunning() {
-    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
+    groupDag.getConf().setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
     initDAG(groupDag);
     startDAG(groupDag);
     dispatcher.await();
@@ -1141,7 +1151,7 @@ public class TestDAGImpl {
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testGroupDAGWithVertexReRunningAfterCommit() {
-    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
+    groupDag.getConf().setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
     initDAG(groupDag);
     startDAG(groupDag);
     dispatcher.await();
@@ -1329,7 +1339,7 @@ public class TestDAGImpl {
   @Test(timeout=5000)
   public void testDAGErrorAbortNonSuccessfulOutputs() {
     // vertex success -> vertex output commit. failed dag aborts only non-successful vertices
-    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
+    mrrDag.getConf().setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
     initDAG(mrrDag);
     dispatcher.await();
     startDAG(mrrDag);
@@ -1610,6 +1620,22 @@ public class TestDAGImpl {
     Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
   }
 
+  @Test(timeout = 5000)
+  public void testConfiguration() throws AMUserCodeException {
+    initDAG(dag);
+    // dag override the default configuration
+    Assert.assertEquals(3, dag.getConf().getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
+        TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT));
+    Vertex v1 = dag.getVertex("vertex1");
+    Vertex v2 = dag.getVertex("vertex2");
+    // v1 override the dagConfiguration
+    Assert.assertEquals(2, v1.getConf().getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
+        TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT));
+    // v2 inherit the configuration from dag
+    Assert.assertEquals(3, v2.getConf().getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
+        TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT));
+  }
+
   public static class CustomizedEdgeManager extends EdgeManagerPlugin {
 
     public static enum ExceptionLocation {