You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/02/11 03:13:02 UTC
tez git commit: TEZ-1233. Allow configuration of framework parameters
per vertex (zjffdu)
Repository: tez
Updated Branches:
refs/heads/master fe39ede33 -> eb9a0345c
TEZ-1233. Allow configuration of framework parameters per vertex (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/eb9a0345
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/eb9a0345
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/eb9a0345
Branch: refs/heads/master
Commit: eb9a0345c8617214be6e1cc331efb553af3e1ccc
Parents: fe39ede
Author: Jeff Zhang <zj...@apache.org>
Authored: Wed Feb 11 10:10:51 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Wed Feb 11 10:11:41 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/api/ConfigurationScope.java | 33 ++++
.../main/java/org/apache/tez/dag/api/DAG.java | 58 ++++---
.../main/java/org/apache/tez/dag/api/Scope.java | 28 ++++
.../apache/tez/dag/api/TezConfiguration.java | 150 ++++++++++++++++++-
.../java/org/apache/tez/dag/api/Vertex.java | 11 ++
tez-api/src/main/proto/DAGApiRecords.proto | 3 +-
.../java/org/apache/tez/dag/api/TestDAG.java | 50 +++++++
.../org/apache/tez/dag/api/TestDAGVerify.java | 2 +-
.../tez/dag/api/TestTezConfiguration.java | 32 ++++
.../org/apache/tez/dag/app/DAGAppMaster.java | 11 +-
.../java/org/apache/tez/dag/app/dag/Vertex.java | 2 +
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 29 ++--
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 34 +++--
.../tez/dag/app/dag/impl/TestDAGImpl.java | 32 +++-
15 files changed, 419 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9979c50..22ecb2a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1233. Allow configuration of framework parameters per vertex.
TEZ-2045. TaskAttemptListener should not pull Tasks from AMContainer. Instead these should be registered with the listener.
TEZ-1914. VertexManager logic should not run on the central dispatcher
TEZ-2023. Refactor logIndividualFetchComplete() to be common for both shuffle-schedulers.
http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-api/src/main/java/org/apache/tez/dag/api/ConfigurationScope.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/ConfigurationScope.java b/tez-api/src/main/java/org/apache/tez/dag/api/ConfigurationScope.java
new file mode 100644
index 0000000..bf2a7d9
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/ConfigurationScope.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.api;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+@Private
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface ConfigurationScope {
+
+ Scope value();
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 7be5ba4..914f946 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -88,6 +88,7 @@ public class DAG {
private DAGAccessControls dagAccessControls;
Map<String, LocalResource> commonTaskLocalFiles = Maps.newHashMap();
String dagInfo;
+ private Map<String,String> dagConf = new HashMap<String, String>();
private Stack<String> topologicalVertexStack = new Stack<String>();
@@ -322,7 +323,13 @@ public class DAG {
public String getName() {
return this.name;
}
-
+
+ public DAG setConf(String property, String value) {
+ TezConfiguration.validateProperty(property, Scope.DAG);
+ dagConf.put(property, value);
+ return this;
+ }
+
@Private
public Map<String, LocalResource> getTaskLocalFiles() {
return commonTaskLocalFiles;
@@ -657,22 +664,21 @@ public class DAG {
}
// create protobuf message describing DAG
- public DAGPlan createDag(Configuration dagConf, Credentials extraCredentials,
+ public DAGPlan createDag(Configuration tezConf, Credentials extraCredentials,
Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
boolean tezLrsAsArchive) {
- return createDag(dagConf, extraCredentials, tezJarResources, binaryConfig, tezLrsAsArchive,
+ return createDag(tezConf, extraCredentials, tezJarResources, binaryConfig, tezLrsAsArchive,
null);
}
// create protobuf message describing DAG
@Private
- public synchronized DAGPlan createDag(Configuration dagConf, Credentials extraCredentials,
+ public synchronized DAGPlan createDag(Configuration tezConf, Credentials extraCredentials,
Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
boolean tezLrsAsArchive, Map<String, String> additionalConfigs) {
verify(true);
DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
-
dagBuilder.setName(this.name);
if (this.dagInfo != null && !this.dagInfo.isEmpty()) {
dagBuilder.setDagInfo(this.dagInfo);
@@ -715,9 +721,9 @@ public class DAG {
// infer credentials, resources and parallelism from data source
Resource vertexTaskResource = vertex.getTaskResource();
if (vertexTaskResource == null) {
- vertexTaskResource = Resource.newInstance(dagConf.getInt(
+ vertexTaskResource = Resource.newInstance(tezConf.getInt(
TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB,
- TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT), dagConf.getInt(
+ TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT), tezConf.getInt(
TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES,
TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT));
}
@@ -782,13 +788,24 @@ public class DAG {
}
}
+ if (vertex.getConf()!= null && vertex.getConf().size() > 0) {
+ ConfigurationProto.Builder confBuilder = ConfigurationProto.newBuilder();
+ for (Map.Entry<String, String> entry : vertex.getConf().entrySet()) {
+ PlanKeyValuePair.Builder keyValueBuilder = PlanKeyValuePair.newBuilder();
+ keyValueBuilder.setKey(entry.getKey());
+ keyValueBuilder.setValue(entry.getValue());
+ confBuilder.addConfKeyValues(keyValueBuilder);
+ }
+ vertexBuilder.setVertexConf(confBuilder);
+ }
+
//task config
PlanTaskConfiguration.Builder taskConfigBuilder = PlanTaskConfiguration.newBuilder();
taskConfigBuilder.setNumTasks(vertexParallelism);
taskConfigBuilder.setMemoryMb(vertexTaskResource.getMemory());
taskConfigBuilder.setVirtualCores(vertexTaskResource.getVirtualCores());
taskConfigBuilder.setJavaOpts(
- TezClientUtils.addDefaultsToTaskLaunchCmdOpts(vertex.getTaskLaunchCmdOpts(), dagConf));
+ TezClientUtils.addDefaultsToTaskLaunchCmdOpts(vertex.getTaskLaunchCmdOpts(), tezConf));
taskConfigBuilder.setTaskModule(vertex.getName());
if (!vertexLRs.isEmpty()) {
@@ -796,7 +813,7 @@ public class DAG {
}
Map<String, String> taskEnv = Maps.newHashMap(vertex.getTaskEnvironment());
- TezYARNUtils.setupDefaultEnv(taskEnv, dagConf,
+ TezYARNUtils.setupDefaultEnv(taskEnv, tezConf,
TezConfiguration.TEZ_TASK_LAUNCH_ENV,
TezConfiguration.TEZ_TASK_LAUNCH_ENV_DEFAULT, tezLrsAsArchive);
for (Map.Entry<String, String> entry : taskEnv.entrySet()) {
@@ -866,16 +883,6 @@ public class DAG {
ConfigurationProto.Builder confProtoBuilder =
ConfigurationProto.newBuilder();
- if (dagConf != null) {
- Iterator<Entry<String, String>> iter = dagConf.iterator();
- while (iter.hasNext()) {
- Entry<String, String> entry = iter.next();
- PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
- kvp.setKey(entry.getKey());
- kvp.setValue(entry.getValue());
- confProtoBuilder.addConfKeyValues(kvp);
- }
- }
if (dagAccessControls != null) {
Configuration aclConf = new Configuration(false);
dagAccessControls.serializeToConfiguration(aclConf);
@@ -885,6 +892,7 @@ public class DAG {
PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
kvp.setKey(entry.getKey());
kvp.setValue(entry.getValue());
+ TezConfiguration.validateProperty(entry.getKey(), Scope.DAG);
confProtoBuilder.addConfKeyValues(kvp);
}
}
@@ -893,11 +901,19 @@ public class DAG {
PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
kvp.setKey(entry.getKey());
kvp.setValue(entry.getValue());
+ TezConfiguration.validateProperty(entry.getKey(), Scope.DAG);
+ confProtoBuilder.addConfKeyValues(kvp);
+ }
+ }
+ if (this.dagConf != null && !this.dagConf.isEmpty()) {
+ for (Entry<String, String> entry : this.dagConf.entrySet()) {
+ PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
+ kvp.setKey(entry.getKey());
+ kvp.setValue(entry.getValue());
confProtoBuilder.addConfKeyValues(kvp);
}
}
- dagBuilder.setDagKeyValues(confProtoBuilder); // This does not seem to be used anywhere
- // should this replace BINARY_PB_CONF???
+ dagBuilder.setDagConf(confProtoBuilder);
if (dagCredentials != null) {
dagBuilder.setCredentialsBinary(DagTypeConverters.convertCredentialsToProto(dagCredentials));
http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-api/src/main/java/org/apache/tez/dag/api/Scope.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Scope.java b/tez-api/src/main/java/org/apache/tez/dag/api/Scope.java
new file mode 100644
index 0000000..d862e8f
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Scope.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+@Private
+public enum Scope {
+ // DO NOT CHANGE THE ORDER
+ AM, // can only been set at AM level
+ DAG, // can been set at AM/DAG level
+ VERTEX, // can been set at AM/DAG/VERTEX level
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index bff5c6a..cfd6426 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -18,6 +18,13 @@
package org.apache.tez.dag.api;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -25,6 +32,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import com.google.common.annotations.VisibleForTesting;
+
+
/**
* Defines the configurations for Tez. These configurations are typically specified in
* tez-site.xml on the client machine where TezClient is used to launch the Tez application.
@@ -33,7 +43,11 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
@Public
public class TezConfiguration extends Configuration {
- public final static String TEZ_SITE_XML = "tez-site.xml";
+ private final static String TEZ_SITE_XML = "tez-site.xml";
+
+ private final static Log LOG = LogFactory.getLog(TezConfiguration.class);
+
+ private static Map<String, Scope> PropertyScope = new HashMap<String, Scope>();
static {
Configuration.addDeprecation("tez.am.counters.max.keys", TezConfiguration.TEZ_COUNTERS_MAX);
@@ -60,6 +74,24 @@ public class TezConfiguration extends Configuration {
Configuration.addDeprecation("tez.task.max-events-per-heartbeat.max",
TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT);
+
+ for (Field field : TezConfiguration.class.getFields()) {
+ if (field.isAnnotationPresent(ConfigurationScope.class)) {
+ ConfigurationScope confScope = field.getAnnotation(ConfigurationScope.class);
+ if (field.getType() == String.class) {
+ try {
+ PropertyScope.put(field.get(null).toString(), confScope.value());
+ } catch (IllegalArgumentException e) {
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ throw new RuntimeException(field.getName() + " is not String type, should not been annotated with "
+ + ConfigurationScope.class.getSimpleName());
+ }
+ }
+ }
}
public TezConfiguration() {
@@ -89,6 +121,7 @@ public class TezConfiguration extends Configuration {
* Boolean value. If true then Tez will try to automatically delete temporary job
* artifacts that it creates within the specified staging dir. Does not affect any user data.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE = TEZ_AM_PREFIX +
"staging.scratch-data.auto-delete";
public static final boolean TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE_DEFAULT = true;
@@ -96,6 +129,7 @@ public class TezConfiguration extends Configuration {
/**
* String value. Specifies a directory where Tez can create temporary job artifacts.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_STAGING_DIR = TEZ_PREFIX + "staging-dir";
public static final String TEZ_AM_STAGING_DIR_DEFAULT = "/tmp/"
+ System.getProperty("user.name") + "/tez/staging";
@@ -104,6 +138,7 @@ public class TezConfiguration extends Configuration {
* String value that is a file path.
* Path to a credentials file (with serialized credentials) located on the local file system.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_CREDENTIALS_PATH = TEZ_PREFIX + "credentials.path";
/**
@@ -114,6 +149,7 @@ public class TezConfiguration extends Configuration {
* by the same user. For long running applications, one-off executions, batch jobs etc non-session
* mode is recommended. If session mode is enabled then container reuse is recommended.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_SESSION_MODE = TEZ_AM_PREFIX + "mode.session";
public static final boolean TEZ_AM_SESSION_MODE_DEFAULT = false;
@@ -133,6 +169,7 @@ public class TezConfiguration extends Configuration {
* DEBUG;org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO is valid
* org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO is not valid
* */
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_LOG_LEVEL = TEZ_AM_PREFIX + "log.level";
public static final String TEZ_AM_LOG_LEVEL_DEFAULT = "INFO";
@@ -152,6 +189,7 @@ public class TezConfiguration extends Configuration {
* DEBUG;org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO is valid
* org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO is not valid
* */
+ @ConfigurationScope(Scope.VERTEX)
public static final String TEZ_TASK_LOG_LEVEL = TEZ_TASK_PREFIX + "log.level";
public static final String TEZ_TASK_LOG_LEVEL_DEFAULT = "INFO";
@@ -163,6 +201,7 @@ public class TezConfiguration extends Configuration {
* vertex succeeds. Depending on the desired output visibility and downstream consumer dependencies
* this value must be appropriately chosen. Defaults to the safe choice of true.
*/
+ @ConfigurationScope(Scope.DAG)
public static final String TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS =
TEZ_AM_PREFIX + "commit-all-outputs-on-dag-success";
public static final boolean TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT = true;
@@ -173,6 +212,7 @@ public class TezConfiguration extends Configuration {
* include default options meant to be used by all jobs in a cluster. If required, the values can
* be overridden per job.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS =
TEZ_AM_PREFIX + "launch.cluster-default.cmd-opts";
public static final String TEZ_AM_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS_DEFAULT =
@@ -183,6 +223,7 @@ public class TezConfiguration extends Configuration {
* AppMaster process. Its recommended to not set any Xmx or Xms in these launch opts so that
* Tez can determine them automatically.
* */
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_LAUNCH_CMD_OPTS = TEZ_AM_PREFIX + "launch.cmd-opts";
public static final String TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT =
"-XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps -XX:+UseNUMA -XX:+UseParallelGC";
@@ -193,6 +234,7 @@ public class TezConfiguration extends Configuration {
* include default options meant to be used by all jobs in a cluster. If required, the values can
* be overridden per job.
*/
+ @ConfigurationScope(Scope.AM) // TODO DAG/Vertex level
public static final String TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS =
TEZ_TASK_PREFIX + "launch.cluster-default.cmd-opts";
public static final String TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_CMD_OPTS_DEFAULT =
@@ -203,6 +245,7 @@ public class TezConfiguration extends Configuration {
* processes. Its recommended to not set any Xmx or Xms in these launch opts
* so that Tez can determine them automatically.
*/
+ @ConfigurationScope(Scope.AM) // TODO DAG/Vertex level
public static final String TEZ_TASK_LAUNCH_CMD_OPTS = TEZ_TASK_PREFIX
+ "launch.cmd-opts";
public static final String TEZ_TASK_LAUNCH_CMD_OPTS_DEFAULT =
@@ -217,6 +260,7 @@ public class TezConfiguration extends Configuration {
* fraction that is applied to the memory allocated Factor to size Xmx based
* on container memory size. Value should be greater than 0 and less than 1.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION =
TEZ_PREFIX + "container.max.java.heap.fraction";
public static final double TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION_DEFAULT = 0.8;
@@ -232,6 +276,7 @@ public class TezConfiguration extends Configuration {
* These take least precedence compared to other methods of setting env.
* These get added to the app master environment prior to launching it.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_LAUNCH_ENV = TEZ_AM_PREFIX
+ "launch.env";
public static final String TEZ_AM_LAUNCH_ENV_DEFAULT = NATIVE_LIB_PARAM_DEFAULT;
@@ -243,11 +288,13 @@ public class TezConfiguration extends Configuration {
* These take least precedence compared to other methods of setting env
* These get added to the task environment prior to launching it.
*/
+ @ConfigurationScope(Scope.VERTEX)
public static final String TEZ_TASK_LAUNCH_ENV = TEZ_TASK_PREFIX
+ "launch.env";
public static final String TEZ_TASK_LAUNCH_ENV_DEFAULT = NATIVE_LIB_PARAM_DEFAULT;
@Private
+ @ConfigurationScope(Scope.DAG)
public static final String TEZ_CANCEL_DELEGATION_TOKENS_ON_COMPLETION = TEZ_PREFIX +
"cancel.delegation.tokens.on.completion";
public static final boolean TEZ_CANCEL_DELEGATION_TOKENS_ON_COMPLETION_DEFAULT = true;
@@ -256,6 +303,7 @@ public class TezConfiguration extends Configuration {
* Int value. The number of threads used to listen to task heartbeat requests.
* Expert level setting.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_TASK_LISTENER_THREAD_COUNT =
TEZ_AM_PREFIX + "task.listener.thread-count";
public static final int TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT = 30;
@@ -267,6 +315,7 @@ public class TezConfiguration extends Configuration {
* counters. Expert level setting.
*/
@Unstable
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_COUNTERS_MAX = TEZ_PREFIX + "counters.max";
public static final int TEZ_COUNTERS_MAX_DEFAULT = 1200;
@@ -276,6 +325,7 @@ public class TezConfiguration extends Configuration {
* counters. Expert level setting.
*/
@Unstable
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_COUNTERS_MAX_GROUPS = TEZ_PREFIX + "counters.max.groups";
public static final int TEZ_COUNTERS_MAX_GROUPS_DEFAULT = 500;
@@ -285,6 +335,7 @@ public class TezConfiguration extends Configuration {
* counters. Expert level setting.
*/
@Unstable
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH =
TEZ_PREFIX + "counters.counter-name.max-length";
public static final int TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH_DEFAULT = 64;
@@ -295,6 +346,7 @@ public class TezConfiguration extends Configuration {
* counters. Expert level setting.
*/
@Unstable
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH =
TEZ_PREFIX + "counters.group-name.max-length";
public static final int TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH_DEFAULT = 128;
@@ -304,6 +356,7 @@ public class TezConfiguration extends Configuration {
* Boolean value. Enable speculative execution of slower tasks. This can help reduce job latency
* when some tasks are running slower due bad/slow machines
*/
+ @ConfigurationScope(Scope.VERTEX) // TODO Verify the vertex speculation, TEZ-1788
public static final String TEZ_AM_SPECULATION_ENABLED = TEZ_AM_PREFIX + "speculation.enabled";
public static final boolean TEZ_AM_SPECULATION_ENABLED_DEFAULT = false;
@@ -312,6 +365,7 @@ public class TezConfiguration extends Configuration {
* should be considered as an outlier/slow task.
*/
@Unstable
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_LEGACY_SPECULATIVE_SLOWTASK_THRESHOLD =
TEZ_AM_PREFIX + "legacy.speculative.slowtask.threshold";
@@ -319,6 +373,7 @@ public class TezConfiguration extends Configuration {
* Int value. Upper limit on the number of threads user to launch containers in the app
* master. Expert level setting.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT =
TEZ_AM_PREFIX + "containerlauncher.thread-count-limit";
@@ -329,6 +384,7 @@ public class TezConfiguration extends Configuration {
/**
* Int value. Specifies the number of task failures on a node before the node is considered faulty.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_MAX_TASK_FAILURES_PER_NODE = TEZ_AM_PREFIX
+ "maxtaskfailures.per.node";
public static final int TEZ_AM_MAX_TASK_FAILURES_PER_NODE_DEFAULT = 10;
@@ -339,6 +395,7 @@ public class TezConfiguration extends Configuration {
* is for cases where the app master is not at fault but is lost due to system errors.
* Expert level setting.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_MAX_APP_ATTEMPTS = TEZ_AM_PREFIX +
"max.app.attempts";
public static final int TEZ_AM_MAX_APP_ATTEMPTS_DEFAULT = 2;
@@ -347,6 +404,7 @@ public class TezConfiguration extends Configuration {
* Int value. The maximum number of attempts that can fail for a particular task before the task is failed.
* This does not count killed attempts. Task failure results in DAG failure.
*/
+ @ConfigurationScope(Scope.VERTEX)
public static final String TEZ_AM_TASK_MAX_FAILED_ATTEMPTS =
TEZ_AM_PREFIX + "task.max.failed.attempts";
public static final int TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT = 4;
@@ -355,6 +413,7 @@ public class TezConfiguration extends Configuration {
* Boolean value. Enabled blacklisting of nodes of nodes that are considered faulty. These nodes
* will not be used to execute tasks.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_NODE_BLACKLISTING_ENABLED = TEZ_AM_PREFIX
+ "node-blacklisting.enabled";
public static final boolean TEZ_AM_NODE_BLACKLISTING_ENABLED_DEFAULT = true;
@@ -364,11 +423,13 @@ public class TezConfiguration extends Configuration {
* This limits the number of nodes that are blacklisted in an effort to minimize the effects of
* temporary surges in failures (e.g. due to network outages).
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD = TEZ_AM_PREFIX
+ "node-blacklisting.ignore-threshold-node-percent";
public static final int TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT = 33;
/** Int value. Number of threads to handle client RPC requests. Expert level setting.*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_CLIENT_THREAD_COUNT =
TEZ_AM_PREFIX + "client.am.thread-count";
public static final int TEZ_AM_CLIENT_THREAD_COUNT_DEFAULT = 1;
@@ -377,22 +438,26 @@ public class TezConfiguration extends Configuration {
* String value. Range of ports that the AM can use when binding for client connections. Leave blank
* to use all possible ports. Expert level setting.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_CLIENT_AM_PORT_RANGE =
TEZ_AM_PREFIX + "client.am.port-range";
/**
* String value. The class to be used for DAG Scheduling. Expert level setting.
*/
+ @ConfigurationScope(Scope.DAG)
public static final String TEZ_AM_DAG_SCHEDULER_CLASS = TEZ_AM_PREFIX + "dag.scheduler.class";
public static final String TEZ_AM_DAG_SCHEDULER_CLASS_DEFAULT =
"org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrder";
/** Int value. The amount of memory in MB to be used by the AppMaster */
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_RESOURCE_MEMORY_MB = TEZ_AM_PREFIX
+ "resource.memory.mb";
public static final int TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT = 1024;
/** Int value. The number of virtual cores to be used by the app master */
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_RESOURCE_CPU_VCORES = TEZ_AM_PREFIX
+ "resource.cpu.vcores";
public static final int TEZ_AM_RESOURCE_CPU_VCORES_DEFAULT = 1;
@@ -400,6 +465,7 @@ public class TezConfiguration extends Configuration {
/** Int value. The amount of memory in MB to be used by tasks. This applies to all tasks across
* all vertices. Setting it to the same value for all tasks is helpful for container reuse and
* thus good for performance typically. */
+ @ConfigurationScope(Scope.DAG) // TODO vertex level
public static final String TEZ_TASK_RESOURCE_MEMORY_MB = TEZ_TASK_PREFIX
+ "resource.memory.mb";
public static final int TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT = 1024;
@@ -407,6 +473,7 @@ public class TezConfiguration extends Configuration {
/**
* Int value. The number of virtual cores to be used by tasks.
*/
+ @ConfigurationScope(Scope.DAG) // TODO vertex level
public static final String TEZ_TASK_RESOURCE_CPU_VCORES = TEZ_TASK_PREFIX
+ "resource.cpu.vcores";
public static final int TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT = 1;
@@ -416,6 +483,7 @@ public class TezConfiguration extends Configuration {
* Increasing this reduces the communication between the AM and the RM and can
* help in scaling up. Expert level setting.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX = TEZ_AM_PREFIX
+ "am-rm.heartbeat.interval-ms.max";
public static final int TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX_DEFAULT = 1000;
@@ -425,6 +493,7 @@ public class TezConfiguration extends Configuration {
* AM for another task. Increasing this can help improve app master scalability for a large
* number of concurrent tasks. Expert level setting.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX = TEZ_TASK_PREFIX
+ "get-task.sleep.interval-ms.max";
public static final int TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT = 200;
@@ -434,6 +503,7 @@ public class TezConfiguration extends Configuration {
* Increasing this can help improve app master scalability for a large number of concurrent tasks.
* Expert level setting.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS = TEZ_TASK_PREFIX
+ "am.heartbeat.interval-ms.max";
public static final int TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 100;
@@ -443,6 +513,7 @@ public class TezConfiguration extends Configuration {
* tasks. This reduces the amount of network traffice between AM and tasks to send high-volume
* counters. Improves AM scalability. Expert level setting.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS = TEZ_TASK_PREFIX
+ "am.heartbeat.counter.interval-ms.max";
public static final int TEZ_TASK_AM_HEARTBEAT_COUNTER_INTERVAL_MS_DEFAULT =
@@ -452,6 +523,7 @@ public class TezConfiguration extends Configuration {
* Int value. Maximum number of of events to fetch from the AM by the tasks in a single heartbeat.
* Expert level setting. Expert level setting.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT = TEZ_TASK_PREFIX
+ "max-events-per-heartbeat";
public static final int TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT = 500;
@@ -463,6 +535,7 @@ public class TezConfiguration extends Configuration {
*/
@Unstable
@Private
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_TASK_GENERATE_COUNTERS_PER_IO = TEZ_TASK_PREFIX
+ "generate.counters.per.io";
@Private
@@ -473,6 +546,7 @@ public class TezConfiguration extends Configuration {
* before its considered lost.
* Expert level setting.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TASK_HEARTBEAT_TIMEOUT_MS = TEZ_TASK_PREFIX + "timeout-ms";
public static final int TASK_HEARTBEAT_TIMEOUT_MS_DEFAULT = 5 * 60 * 1000;
@@ -481,6 +555,7 @@ public class TezConfiguration extends Configuration {
* Int value. Time interval, in milliseconds, between checks for lost tasks.
* Expert level setting.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TASK_HEARTBEAT_TIMEOUT_CHECK_MS = TEZ_TASK_PREFIX + "heartbeat.timeout.check-ms";
public static final int TASK_HEARTBEAT_TIMEOUT_CHECK_MS_DEFAULT = 30 * 1000;
@@ -491,6 +566,7 @@ public class TezConfiguration extends Configuration {
*/
@Private
@Unstable
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_TASK_SCALE_MEMORY_ENABLED = TEZ_TASK_PREFIX
+ "scale.memory.enabled";
@Private
@@ -501,6 +577,7 @@ public class TezConfiguration extends Configuration {
*/
@Private
@Unstable
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS = TEZ_TASK_PREFIX
+ "scale.memory.allocator.class";
@Private
@@ -513,6 +590,7 @@ public class TezConfiguration extends Configuration {
*/
@Private
@Unstable
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION = TEZ_TASK_PREFIX
+ "scale.memory.reserve-fraction";
@Private
@@ -524,6 +602,7 @@ public class TezConfiguration extends Configuration {
*/
@Private
@Unstable
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_PER_IO =
TEZ_TASK_PREFIX + "scale.memory.additional-reservation.fraction.per-io";
@@ -532,6 +611,7 @@ public class TezConfiguration extends Configuration {
/**
* Max cumulative total reservation for additional IOs.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_MAX =
TEZ_TASK_PREFIX + "scale.memory.additional-reservation.fraction.max";
/*
@@ -541,6 +621,7 @@ public class TezConfiguration extends Configuration {
*/
@Private
@Unstable
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS =
TEZ_TASK_PREFIX + "scale.memory.ratios";
@@ -549,6 +630,7 @@ public class TezConfiguration extends Configuration {
/**
* Defines the ProcessTree implementation which will be used to collect resource utilization.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_TASK_RESOURCE_CALCULATOR_PROCESS_TREE_CLASS =
TEZ_TASK_PREFIX + "resource.calculator.process-tree.class";
@@ -557,6 +639,7 @@ public class TezConfiguration extends Configuration {
* Boolean value. Configuration to specify whether container should be reused across tasks.
* This improves performance by not incurring recurring launch overheads.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_CONTAINER_REUSE_ENABLED = TEZ_AM_PREFIX
+ "container.reuse.enabled";
public static final boolean TEZ_AM_CONTAINER_REUSE_ENABLED_DEFAULT = true;
@@ -565,6 +648,7 @@ public class TezConfiguration extends Configuration {
* Boolean value. Whether to reuse containers for rack local tasks. Active only if reuse is
* enabled.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED =
TEZ_AM_PREFIX + "container.reuse.rack-fallback.enabled";
public static final boolean
@@ -575,6 +659,7 @@ public class TezConfiguration extends Configuration {
* enabled. Turning this on can severely affect locality and can be bad for jobs with high data
* volume being read from the primary data sources.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED =
TEZ_AM_PREFIX + "container.reuse.non-local-fallback.enabled";
public static final boolean
@@ -584,6 +669,7 @@ public class TezConfiguration extends Configuration {
* Int value. The amount of time to wait before assigning a container to the next level
* of locality. NODE -> RACK -> NON_LOCAL. Delay scheduling parameter. Expert level setting.
*/
+ @ConfigurationScope(Scope.AM)
public static final String
TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS =
TEZ_AM_PREFIX + "container.reuse.locality.delay-allocation-millis";
@@ -594,6 +680,7 @@ public class TezConfiguration extends Configuration {
* Int value. The minimum amount of time to hold on to a container that is idle. Only active when
* reuse is enabled. Set to -1 to never release idle containers (not recommended).
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS =
TEZ_AM_PREFIX + "container.idle.release-timeout-min.millis";
public static final long
@@ -609,6 +696,7 @@ public class TezConfiguration extends Configuration {
* TezConfiguration#TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS. This
* creates a graceful reduction in the amount of idle resources held
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS =
TEZ_AM_PREFIX + "container.idle.release-timeout-max.millis";
public static final long
@@ -619,6 +707,7 @@ public class TezConfiguration extends Configuration {
* non-session mode. Enables an idle session (not running any DAG) to hold on to a minimum number
* of containers to provide fast response times for the next DAG.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_SESSION_MIN_HELD_CONTAINERS =
TEZ_AM_PREFIX + "session.min.held-containers";
public static final int TEZ_AM_SESSION_MIN_HELD_CONTAINERS_DEFAULT = 0;
@@ -632,6 +721,7 @@ public class TezConfiguration extends Configuration {
* the cost of losing work. Setting to 0 turns off preemption. Expert level
* setting.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_PREEMPTION_PERCENTAGE =
TEZ_AM_PREFIX + "preemption.percentage";
public static final int TEZ_AM_PREEMPTION_PERCENTAGE_DEFAULT = 10;
@@ -641,6 +731,7 @@ public class TezConfiguration extends Configuration {
* more running tasks. After preempting a task, we need to wait at least 1 heartbeat so that the
* RM can act on the released resources and assign new ones to us. Expert level setting.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS =
TEZ_AM_PREFIX + "preemption.heartbeats-between-preemptions";
public static final int TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS_DEFAULT = 3;
@@ -666,6 +757,7 @@ public class TezConfiguration extends Configuration {
* </ul>
* </ol>
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_LIB_URIS = TEZ_PREFIX + "lib.uris";
/**
@@ -680,6 +772,7 @@ public class TezConfiguration extends Configuration {
* All duplicate resources are ignored.
*
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AUX_URIS = TEZ_PREFIX + "aux.uris";
/**
@@ -687,6 +780,7 @@ public class TezConfiguration extends Configuration {
* raw Tez application where classpath is propagated with application
* via {@link LocalResource}s. This is mainly useful for developer/debugger scenarios.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_IGNORE_LIB_URIS = TEZ_PREFIX + "ignore.lib.uris";
/**
@@ -695,6 +789,7 @@ public class TezConfiguration extends Configuration {
* This is disabled by default - with the expectation being that tez.lib.uris has a complete
* tez-deployment which contains the hadoop libraries.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_USE_CLUSTER_HADOOP_LIBS = TEZ_PREFIX + "use.cluster.hadoop-libs";
public static final boolean TEZ_USE_CLUSTER_HADOOP_LIBS_DEFAULT = false;
@@ -705,6 +800,7 @@ public class TezConfiguration extends Configuration {
* This will be prepended to the classpath before all framework specific components have been
* specified.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_CLUSTER_ADDITIONAL_CLASSPATH_PREFIX =
TEZ_PREFIX + "cluster.additional.classpath.prefix";
@@ -721,6 +817,7 @@ public class TezConfiguration extends Configuration {
* AM then this timeout may be hit. In those case, using non-session mode is recommended if
* applicable. Otherwise increase the timeout (set to -1 for infinity. Not recommended)
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_SESSION_CLIENT_TIMEOUT_SECS =
TEZ_SESSION_PREFIX + "client.timeout.secs";
public static final int TEZ_SESSION_CLIENT_TIMEOUT_SECS_DEFAULT =
@@ -730,6 +827,7 @@ public class TezConfiguration extends Configuration {
* Int value. Time (in seconds) for which the Tez AM should wait for a DAG to be submitted before
* shutting down. Only relevant in session mode.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS =
TEZ_SESSION_PREFIX + "am.dag.submit.timeout.secs";
public static final int TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT =
@@ -738,12 +836,14 @@ public class TezConfiguration extends Configuration {
/**
* String value. The queue name for all jobs being submitted from a given client.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_QUEUE_NAME = TEZ_PREFIX + "queue.name";
@Unstable
/**
* Boolean value. Generate debug artifacts like DAG plan in text format.
*/
+ @ConfigurationScope(Scope.DAG)
public static final String TEZ_GENERATE_DEBUG_ARTIFACTS =
TEZ_PREFIX + "generate.debug.artifacts";
public static final boolean TEZ_GENERATE_DEBUG_ARTIFACTS_DEFAULT = false;
@@ -760,6 +860,7 @@ public class TezConfiguration extends Configuration {
* v[] - Additional launch-cmd options for all tasks in vertex v
*/
@Unstable
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_TASK_SPECIFIC_LAUNCH_CMD_OPTS_LIST = TEZ_PREFIX + "task-specific" +
".launch.cmd-opts.list";
@@ -771,6 +872,7 @@ public class TezConfiguration extends Configuration {
* "-agentpath:libpagent.so,dir=/tmp/__VERTEX_NAME__/__TASK_INDEX__"
*/
@Unstable
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_TASK_SPECIFIC_LAUNCH_CMD_OPTS = TEZ_PREFIX + "task-specific" +
".launch.cmd-opts";
@@ -791,12 +893,14 @@ public class TezConfiguration extends Configuration {
* org.apache.hadoop.ipc=INFO;org.apache.hadoop.security=INFO is not valid
* */
@Unstable
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_TASK_SPECIFIC_LOG_LEVEL = TEZ_PREFIX + "task-specific" + ".log.level";
/**
* String value that is a class name.
* Specify the class to use for logging history data
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_HISTORY_LOGGING_SERVICE_CLASS =
TEZ_PREFIX + "history.logging.service.class";
@@ -808,6 +912,7 @@ public class TezConfiguration extends Configuration {
* container logging directory. This is relevant only when SimpleHistoryLoggingService is being
* used for {@link TezConfiguration#TEZ_HISTORY_LOGGING_SERVICE_CLASS}
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_SIMPLE_HISTORY_LOGGING_DIR =
TEZ_PREFIX + "simple.history.logging.dir";
@@ -815,6 +920,7 @@ public class TezConfiguration extends Configuration {
* Int value. Maximum errors allowed while logging history data. After crossing this limit history
* logging gets disabled. The job continues to run after this.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_SIMPLE_HISTORY_LOGGING_MAX_ERRORS =
TEZ_PREFIX + "simple.history.max.errors";
public static final int TEZ_SIMPLE_HISTORY_LOGGING_MAX_ERRORS_DEFAULT = 10;
@@ -823,6 +929,7 @@ public class TezConfiguration extends Configuration {
* Int value. Time, in milliseconds, to wait while flushing YARN ATS data during shutdown.
* Expert level setting.
*/
+ @ConfigurationScope(Scope.AM)
public static final String YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS =
TEZ_PREFIX + "yarn.ats.event.flush.timeout.millis";
public static final long YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT =
@@ -832,6 +939,7 @@ public class TezConfiguration extends Configuration {
* Int value. Max no. of events to send in a single batch to ATS.
* Expert level setting.
*/
+ @ConfigurationScope(Scope.AM)
public static final String YARN_ATS_MAX_EVENTS_PER_BATCH =
TEZ_PREFIX + "yarn.ats.max.events.per.batch";
public static final int YARN_ATS_MAX_EVENTS_PER_BATCH_DEFAULT = 5;
@@ -841,17 +949,20 @@ public class TezConfiguration extends Configuration {
* Int value. Time, in milliseconds, to wait for an event before sending a batch to ATS.
* Expert level setting.
*/
+ @ConfigurationScope(Scope.AM)
public static final String YARN_ATS_MAX_POLLING_TIME_PER_EVENT = TEZ_PREFIX
+ "yarn.ats.max.polling.time.per.event.millis";
public static final int YARN_ATS_MAX_POLLING_TIME_PER_EVENT_DEFAULT = 10;
-
+ @ConfigurationScope(Scope.AM)
public static final String YARN_ATS_ACL_DOMAINS_AUTO_CREATE = TEZ_PREFIX
+ "yarn.ats.acl.domains.auto-create";
public static final boolean YARN_ATS_ACL_DOMAINS_AUTO_CREATE_DEFAULT = true;
+ @ConfigurationScope(Scope.AM)
public static final String YARN_ATS_ACL_SESSION_DOMAIN_ID = TEZ_PREFIX
+ "yarn.ats.acl.session.domain.id";
+ @ConfigurationScope(Scope.DAG)
public static final String YARN_ATS_ACL_DAG_DOMAIN_ID = TEZ_PREFIX
+ "yarn.ats.acl.dag.domain.id";
@@ -859,6 +970,7 @@ public class TezConfiguration extends Configuration {
* Boolean value. Enable recovery of DAGs. This allows a restarted app master to recover the
* incomplete DAGs from the previous instance of the app master.
*/
+ @ConfigurationScope(Scope.AM)
public static final String DAG_RECOVERY_ENABLED =
TEZ_PREFIX + "dag.recovery.enabled";
public static final boolean DAG_RECOVERY_ENABLED_DEFAULT = true;
@@ -867,6 +979,7 @@ public class TezConfiguration extends Configuration {
* Int value. Size in bytes for the IO buffer size while processing the recovery file.
* Expert level setting.
*/
+ @ConfigurationScope(Scope.AM)
public static final String DAG_RECOVERY_FILE_IO_BUFFER_SIZE =
TEZ_PREFIX + "dag.recovery.io.buffer.size";
public static final int DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT = 8192;
@@ -874,6 +987,7 @@ public class TezConfiguration extends Configuration {
/**
* Int value. Number of recovery events to buffer before flushing them to the recovery log.
*/
+ @ConfigurationScope(Scope.AM)
public static final String DAG_RECOVERY_MAX_UNFLUSHED_EVENTS =
TEZ_PREFIX + "dag.recovery.max.unflushed.events";
public static final int DAG_RECOVERY_MAX_UNFLUSHED_EVENTS_DEFAULT = 100;
@@ -881,6 +995,7 @@ public class TezConfiguration extends Configuration {
/**
* Int value. Interval, in seconds, between flushing recovery data to the recovery log.
*/
+ @ConfigurationScope(Scope.AM)
public static final String DAG_RECOVERY_FLUSH_INTERVAL_SECS =
TEZ_PREFIX + "dag.recovery.flush.interval.secs";
public static final int DAG_RECOVERY_FLUSH_INTERVAL_SECS_DEFAULT = 30;
@@ -889,6 +1004,7 @@ public class TezConfiguration extends Configuration {
* Boolean value. Enable local mode execution in Tez. Enables tasks to run in the same process as
* the app master. Primarily used for debugging.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_LOCAL_MODE =
TEZ_PREFIX + "local.mode";
@@ -898,6 +1014,7 @@ public class TezConfiguration extends Configuration {
* Tez AM Inline Mode flag. Not valid till Tez-684 get checked-in
*/
@Private
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_INLINE_TASK_EXECUTION_ENABLED =
TEZ_AM_PREFIX + "inline.task.execution.enabled";
@@ -911,6 +1028,7 @@ public class TezConfiguration extends Configuration {
* Int value.
* The maximium number of tasks running in parallel within the app master process.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS =
TEZ_AM_PREFIX + "inline.task.execution.max-tasks";
@@ -935,6 +1053,7 @@ public class TezConfiguration extends Configuration {
/**
* Boolean value. Configuration to enable/disable ACL checks.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_ACLS_ENABLED = TEZ_AM_PREFIX + "acls.enabled";
public static final boolean TEZ_AM_ACLS_ENABLED_DEFAULT = true;
@@ -945,6 +1064,7 @@ public class TezConfiguration extends Configuration {
* Comma separated list of users, followed by whitespace, followed by a comma separated list of
* groups
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_VIEW_ACLS = TEZ_AM_PREFIX + "view-acls";
/**
@@ -954,12 +1074,14 @@ public class TezConfiguration extends Configuration {
* Comma separated list of users, followed by whitespace, followed by a comma separated list of
* groups
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_MODIFY_ACLS = TEZ_AM_PREFIX + "modify-acls";
/**
* Boolean value.
* Disable version check between client and AM/DAG. Default false.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_DISABLE_CLIENT_VERSION_CHECK = TEZ_AM_PREFIX
+ "disable.client-version-check";
public static final boolean TEZ_AM_DISABLE_CLIENT_VERSION_CHECK_DEFAULT = false;
@@ -968,6 +1090,7 @@ public class TezConfiguration extends Configuration {
* Boolean value.
* Allow disabling of Timeline Domains even if Timeline is being used.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS = TEZ_PREFIX
+ "allow.disabled.timeline-domains";
public static final boolean TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT = false;
@@ -986,6 +1109,7 @@ public class TezConfiguration extends Configuration {
* For example, "http://uihost:9001/#/tez-app/__APPLICATION_ID__/ will be replaced to
* http://uihost:9001/#/tez-app/application_1421880306565_0001/
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE = TEZ_AM_PREFIX
+ "tez-ui.history-url.template";
public static final String TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE_DEFAULT =
@@ -998,6 +1122,7 @@ public class TezConfiguration extends Configuration {
* if the ui is hosted on the default port (80 for http and 443 for https), the port should not
* be specified.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_HISTORY_URL_BASE = TEZ_PREFIX
+ "tez-ui.history-url.base";
@@ -1006,7 +1131,28 @@ public class TezConfiguration extends Configuration {
* Allow disabling of the Tez AM webservice. If set to false the Tez-UI wont show progress
* updates for running application.
*/
+ @ConfigurationScope(Scope.AM)
public static final String TEZ_AM_WEBSERVICE_ENABLE = TEZ_AM_PREFIX
+ "tez-ui.webservice.enable";
public static final boolean TEZ_AM_WEBSERVICE_ENABLE_DEFAULT = true;
+
+ // TODO only validate property here, value can also be validated if necessary
+ public static void validateProperty(String property, Scope usedScope) {
+ Scope validScope = PropertyScope.get(property);
+ if (validScope == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(property + " is not standard configuration property of tez, can not been validated");
+ }
+ } else {
+ if (usedScope.ordinal() > validScope.ordinal()) {
+ throw new IllegalStateException(property + " is set at the scope of " + usedScope
+ + ", but it is only valid in the scope of " + validScope);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ static Set<String> getPropertySet() {
+ return PropertyScope.keySet();
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 91d8cdf..c8d3df7 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -55,6 +55,7 @@ public class Vertex {
private Resource taskResource;
private final Map<String, LocalResource> taskLocalResources = new HashMap<String, LocalResource>();
private Map<String, String> taskEnvironment = new HashMap<String, String>();
+ private Map<String, String> vertexConf = new HashMap<String, String>();
private final List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs
= new ArrayList<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>();
private final List<RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> additionalOutputs
@@ -288,6 +289,10 @@ public class Vertex {
return taskEnvironment;
}
+ public Map<String, String> getConf() {
+ return vertexConf;
+ }
+
/**
* Set the command opts for tasks of this vertex. This method should be used
* when different vertices have different opts. Else, set the launch opts for '
@@ -386,6 +391,12 @@ public class Vertex {
return taskLaunchCmdOpts;
}
+ public Vertex setConf(String property, String value) {
+ TezConfiguration.validateProperty(property, Scope.VERTEX);
+ this.vertexConf.put(property, value);
+ return this;
+ }
+
@Override
public String toString() {
return "[" + vertexName + " : " + processorDescriptor.getClassName() + "]";
http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index 0539405..dbd569a 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -138,6 +138,7 @@ message VertexPlan {
repeated RootInputLeafOutputProto inputs = 8;
repeated RootInputLeafOutputProto outputs = 9;
optional TezEntityDescriptorProto vertex_manager_plugin = 10;
+ optional ConfigurationProto vertexConf = 11;
}
message EdgePlan {
@@ -160,7 +161,7 @@ message DAGPlan {
required string name = 1;
repeated VertexPlan vertex = 2;
repeated EdgePlan edge = 3;
- optional ConfigurationProto dagKeyValues = 4;
+ optional ConfigurationProto dagConf = 4;
optional bytes credentials_binary = 5;
repeated PlanVertexGroupInfo vertex_groups = 6;
repeated PlanLocalResource local_resource = 7;
http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
index dd9feff..8e7f80b 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
@@ -136,4 +136,54 @@ public class TestDAG {
Assert.assertTrue(e.getMessage().contains("already defined"));
}
}
+
+ @Test(timeout = 5000)
+ public void testDAGConf() {
+ DAG dag = DAG.create("dag1");
+ // it's OK to set custom configuration
+ dag.setConf("unknown_conf", "value");
+
+ // set invalid AM level configuration
+ try {
+ dag.setConf(TezConfiguration.TEZ_AM_SESSION_MODE, true+"");
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ Assert.assertEquals("tez.am.mode.session is set at the scope of DAG,"
+ + " but it is only valid in the scope of AM",
+ e.getMessage());
+ }
+ // set valid DAG level configuration
+ dag.setConf(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false + "");
+ // set valid Vertex level configuration
+ dag.setConf(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 3 + "");
+ }
+
+ @Test(timeout = 5000)
+ public void testVertexConf() {
+ Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("dummyProcessor"));
+ // it's OK to set custom property
+ v1.setConf("unknown_conf", "value");
+
+ // set invalid AM level configuration
+ try {
+ v1.setConf(TezConfiguration.TEZ_AM_SESSION_MODE, true+"");
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ Assert.assertEquals("tez.am.mode.session is set at the scope of VERTEX,"
+ + " but it is only valid in the scope of AM",
+ e.getMessage());
+ }
+
+ // set invalid DAG level configuration
+ try {
+ v1.setConf(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false + "");
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ Assert.assertEquals("tez.am.commit-all-outputs-on-dag-success is set at the scope of VERTEX,"
+ + " but it is only valid in the scope of DAG",
+ e.getMessage());
+ }
+ // set valid Vertex level configuration
+ v1.setConf(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 3 + "");
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index 74e780c..1964100 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -1044,7 +1044,7 @@ public class TestDAGVerify {
Assert.assertNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
Assert.assertNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
- ConfigurationProto confProto = dagPlan.getDagKeyValues();
+ ConfigurationProto confProto = dagPlan.getDagConf();
boolean foundViewAcls = false;
boolean foundModifyAcls = false;
http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-api/src/test/java/org/apache/tez/dag/api/TestTezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestTezConfiguration.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestTezConfiguration.java
index 9ec4853..4423f16 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestTezConfiguration.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestTezConfiguration.java
@@ -18,6 +18,14 @@
package org.apache.tez.dag.api;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;
@@ -44,4 +52,28 @@ public class TestTezConfiguration {
Assert.assertNull(tezConf5.get(TezConfiguration.TEZ_LIB_URIS));
}
+ @Test(timeout = 5000)
+ public void testKeySet() throws IllegalAccessException {
+ Class<?> c = TezConfiguration.class;
+ Set<String> expectedKeys = new HashSet<String>();
+ for (Field f : c.getFields()) {
+ if (!f.getName().endsWith("DEFAULT") && f.getType() == String.class) {
+ String value = (String)f.get(null);
+ // not prefix
+ if (!value.endsWith(".")) {
+ expectedKeys.add((String) f.get(null));
+ Assert.assertNotNull("field " + f.getName() + " do not have annotation of ConfigurationScope.",
+ f.getAnnotation(ConfigurationScope.class));
+ }
+ }
+ }
+
+ Set<String> actualKeySet = TezConfiguration.getPropertySet();
+ for (String key : actualKeySet) {
+ if (!expectedKeys.remove(key)) {
+ fail("Found unexpected key: " + key + " in key set");
+ }
+ }
+ assertTrue("Missing keys in key set: " + expectedKeys, expectedKeys.size() == 0);
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 5aca3cf..8fd5626 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -766,15 +766,6 @@ public class DAGAppMaster extends AbstractService {
dagCounter.incrementAndGet());
}
- Iterator<PlanKeyValuePair> iter =
- dagPB.getDagKeyValues().getConfKeyValuesList().iterator();
- Configuration dagConf = new Configuration(amConf);
-
- while (iter.hasNext()) {
- PlanKeyValuePair keyValPair = iter.next();
- dagConf.set(keyValPair.getKey(), keyValPair.getValue());
- }
-
Credentials dagCredentials = null;
if (dagPB.hasCredentialsBinary()) {
dagCredentials = DagTypeConverters.convertByteStringToCredentials(dagPB
@@ -788,7 +779,7 @@ public class DAGAppMaster extends AbstractService {
// create single dag
DAGImpl newDag =
- new DAGImpl(dagId, dagConf, dagPB, dispatcher.getEventHandler(),
+ new DAGImpl(dagId, amConf, dagPB, dispatcher.getEventHandler(),
taskAttemptListener, dagCredentials, clock,
appMasterUgi.getShortUserName(),
taskHeartbeatHandler, context);
http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 74b4080..291a0c5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -24,6 +24,7 @@ import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
@@ -151,4 +152,5 @@ public interface Vertex extends Comparable<Vertex> {
public int getKilledTaskAttemptCount();
+ public Configuration getConf();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index aa7723b..dd38c29 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -59,6 +59,7 @@ import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.Scope;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
@@ -71,6 +72,7 @@ import org.apache.tez.dag.api.client.VertexStatusBuilder;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
import org.apache.tez.dag.api.records.DAGProtos.PlanVertexGroupInfo;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
@@ -172,7 +174,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
TezCounters fullCounters = null;
private Set<TezVertexID> reRunningVertices = new HashSet<TezVertexID>();
- public final Configuration conf;
+ public final Configuration dagConf;
private final DAGPlan jobPlan;
Map<String, LocalResource> localResources;
@@ -407,7 +409,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
public DAGImpl(TezDAGID dagId,
- Configuration conf,
+ Configuration amConf,
DAGPlan jobPlan,
EventHandler eventHandler,
TaskAttemptListener taskAttemptListener,
@@ -418,9 +420,16 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
AppContext appContext) {
this.dagId = dagId;
this.jobPlan = jobPlan;
- this.conf = conf;
+ this.dagConf = new Configuration(amConf);
+ Iterator<PlanKeyValuePair> iter =
+ jobPlan.getDagConf().getConfKeyValuesList().iterator();
+ // override the amConf by using DAG level configuration
+ while (iter.hasNext()) {
+ PlanKeyValuePair keyValPair = iter.next();
+ TezConfiguration.validateProperty(keyValPair.getKey(), Scope.DAG);
+ this.dagConf.set(keyValPair.getKey(), keyValPair.getValue());
+ }
this.dagName = (jobPlan.getName() != null) ? jobPlan.getName() : "<missing app name>";
-
this.userName = appUserName;
this.clock = clock;
this.appContext = appContext;
@@ -448,9 +457,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
this.aclManager = new ACLManager(appContext.getAMACLManager(), dagUGI.getShortUserName(),
- this.conf);
+ this.dagConf);
- this.taskSpecificLaunchCmdOption = new TaskSpecificLaunchCmdOption(conf);
+ this.taskSpecificLaunchCmdOption = new TaskSpecificLaunchCmdOption(dagConf);
// This "this leak" is okay because the retained pointer is in an
// instance variable.
stateMachine = stateMachineFactory.make(this);
@@ -474,7 +483,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
// TODO maybe removed after TEZ-74
@Override
public Configuration getConf() {
- return conf;
+ return dagConf;
}
@Override
@@ -1261,7 +1270,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
initTime = clock.getTime();
}
- commitAllOutputsOnSuccess = conf.getBoolean(
+ commitAllOutputsOnSuccess = dagConf.getBoolean(
TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT);
@@ -1353,7 +1362,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
private static void assignDAGScheduler(DAGImpl dag) {
- String dagSchedulerClassName = dag.conf.get(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS,
+ String dagSchedulerClassName = dag.dagConf.get(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS,
TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS_DEFAULT);
LOG.info("Using DAG Scheduler: " + dagSchedulerClassName);
dag.dagScheduler = ReflectionUtils.createClazzInstance(dagSchedulerClassName, new Class<?>[] {
@@ -1368,7 +1377,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
.convertFromDAGPlan(vertexPlan.getTaskLocationHintList());
VertexImpl v = new VertexImpl(
- vertexId, vertexPlan, vertexName, dag.conf,
+ vertexId, vertexPlan, vertexName, dag.dagConf,
dag.eventHandler, dag.taskAttemptListener,
dag.clock, dag.taskHeartbeatHandler,
!dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint,
http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 05c3cc1..145170c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -71,6 +71,7 @@ import org.apache.tez.dag.api.OutputCommitterDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
+import org.apache.tez.dag.api.Scope;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
@@ -85,6 +86,8 @@ import org.apache.tez.dag.api.client.VertexStatusBuilder;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.api.event.VertexStateUpdateParallelismUpdated;
import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
@@ -211,7 +214,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private TezCounters fullCounters = null;
private Resource taskResource;
- private Configuration conf;
+ private Configuration vertexConf;
private final boolean isSpeculationEnabled;
@@ -717,7 +720,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private final TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOpts;
public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
- String vertexName, Configuration conf, EventHandler eventHandler,
+ String vertexName, Configuration dagConf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Clock clock,
TaskHeartbeatHandler thh, boolean commitVertexOutputs,
AppContext appContext, VertexLocationHint vertexLocationHint,
@@ -726,7 +729,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
this.vertexId = vertexId;
this.vertexPlan = vertexPlan;
this.vertexName = StringInterner.weakIntern(vertexName);
- this.conf = conf;
+ this.vertexConf = new Configuration(dagConf);
+ // override dag configuration by using vertex's specified configuration
+ if (vertexPlan.hasVertexConf()) {
+ ConfigurationProto confProto = vertexPlan.getVertexConf();
+ for (PlanKeyValuePair keyValuePair : confProto.getConfKeyValuesList()) {
+ TezConfiguration.validateProperty(keyValuePair.getKey(), Scope.VERTEX);
+ vertexConf.set(keyValuePair.getKey(), keyValuePair.getValue());
+ }
+ }
this.clock = clock;
this.appContext = appContext;
this.commitVertexOutputs = commitVertexOutputs;
@@ -761,7 +772,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// Set up log properties, including task specific log properties.
String javaOptsWithoutLoggerMods =
vertexPlan.getTaskConfig().hasJavaOpts() ? vertexPlan.getTaskConfig().getJavaOpts() : null;
- String logString = conf.get(TezConfiguration.TEZ_TASK_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL_DEFAULT);
+ String logString = vertexConf.get(TezConfiguration.TEZ_TASK_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL_DEFAULT);
String [] taskLogParams = TezClientUtils.parseLogParams(logString);
this.javaOpts = TezClientUtils.maybeAddDefaultLoggingJavaOpts(taskLogParams[0], javaOptsWithoutLoggerMods);
@@ -803,11 +814,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
this.dagVertexGroups = dagVertexGroups;
- isSpeculationEnabled = conf.getBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED,
+ isSpeculationEnabled = vertexConf.getBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED,
TezConfiguration.TEZ_AM_SPECULATION_ENABLED_DEFAULT);
-
+ LOG.info("isSpeculationEnabled:" + isSpeculationEnabled);
if (isSpeculationEnabled()) {
- speculator = new LegacySpeculator(conf, getAppContext(), this);
+ speculator = new LegacySpeculator(vertexConf, getAppContext(), this);
}
logIdentifier = this.getVertexId() + " [" + this.getName() + "]";
@@ -819,6 +830,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
augmentStateMachine();
}
+ @Override
+ public Configuration getConf() {
+ return vertexConf;
+ }
+
private boolean isSpeculationEnabled() {
return isSpeculationEnabled;
}
@@ -2018,7 +2034,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
TaskImpl task =
new TaskImpl(this.getVertexId(), i,
this.eventHandler,
- conf,
+ vertexConf,
this.taskAttemptListener,
this.clock,
this.taskHeartbeatHandler,
@@ -2211,7 +2227,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
LOG.info("Setting vertexManager to ShuffleVertexManager for "
+ logIdentifier);
// shuffle vertex manager needs a conf payload
- vertexManager = new VertexManager(ShuffleVertexManager.createConfigBuilder(conf).build(),
+ vertexManager = new VertexManager(ShuffleVertexManager.createConfigBuilder(vertexConf).build(),
dagUgi, this, appContext, stateChangeNotifier);
} else {
// schedule all tasks upon vertex start. Default behavior.
http://git-wip-us.apache.org/repos/asf/tez/blob/eb9a0345/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 599f01e..459ecad 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -64,11 +64,13 @@ import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType;
import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataSourceType;
import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSchedulingType;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
@@ -441,6 +443,10 @@ public class TestDAGImpl {
LOG.info("Setting up dag plan");
DAGPlan dag = DAGPlan.newBuilder()
.setName("testverteximpl")
+ .setDagConf(ConfigurationProto.newBuilder()
+ .addConfKeyValues(PlanKeyValuePair.newBuilder()
+ .setKey(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS)
+ .setValue(3 + "")))
.addVertex(
VertexPlan.newBuilder()
.setName("vertex1")
@@ -460,6 +466,10 @@ public class TestDAGImpl {
.setTaskModule("x1.y1")
.build()
)
+ .setVertexConf(ConfigurationProto.newBuilder()
+ .addConfKeyValues(PlanKeyValuePair.newBuilder()
+ .setKey(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS)
+ .setValue(2+"")))
.addOutEdgeId("e1")
.build()
)
@@ -1117,7 +1127,7 @@ public class TestDAGImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testGroupDAGWithVertexReRunning() {
- conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
+ groupDag.getConf().setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
initDAG(groupDag);
startDAG(groupDag);
dispatcher.await();
@@ -1141,7 +1151,7 @@ public class TestDAGImpl {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testGroupDAGWithVertexReRunningAfterCommit() {
- conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
+ groupDag.getConf().setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
initDAG(groupDag);
startDAG(groupDag);
dispatcher.await();
@@ -1329,7 +1339,7 @@ public class TestDAGImpl {
@Test(timeout=5000)
public void testDAGErrorAbortNonSuccessfulOutputs() {
// vertex success -> vertex output commit. failed dag aborts only non-successful vertices
- conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
+ mrrDag.getConf().setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
initDAG(mrrDag);
dispatcher.await();
startDAG(mrrDag);
@@ -1610,6 +1620,22 @@ public class TestDAGImpl {
Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
}
+ @Test(timeout = 5000)
+ public void testConfiguration() throws AMUserCodeException {
+ initDAG(dag);
+ // dag override the default configuration
+ Assert.assertEquals(3, dag.getConf().getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
+ TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT));
+ Vertex v1 = dag.getVertex("vertex1");
+ Vertex v2 = dag.getVertex("vertex2");
+ // v1 override the dagConfiguration
+ Assert.assertEquals(2, v1.getConf().getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
+ TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT));
+ // v2 inherit the configuration from dag
+ Assert.assertEquals(3, v2.getConf().getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
+ TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT));
+ }
+
public static class CustomizedEdgeManager extends EdgeManagerPlugin {
public static enum ExceptionLocation {