You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/03/18 22:22:32 UTC
[1/2] git commit: TEZ-851. Handle failure to persist events to HDFS.
(hitesh)
Repository: incubator-tez
Updated Branches:
refs/heads/master 3f3f94827 -> 8d87e0398
TEZ-851. Handle failure to persist events to HDFS. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/c548c915
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/c548c915
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/c548c915
Branch: refs/heads/master
Commit: c548c91576875030862aa909dea68cf8a40673a8
Parents: 3f3f948
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Mar 18 12:32:35 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Mar 18 12:32:35 2014 -0700
----------------------------------------------------------------------
docs/src/site/site.xml | 2 +-
.../org/apache/tez/dag/app/DAGAppMaster.java | 7 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 47 ++++---
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 18 ++-
.../tez/dag/history/HistoryEventHandler.java | 18 ++-
.../dag/history/recovery/RecoveryService.java | 131 +++++++++----------
6 files changed, 132 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c548c915/docs/src/site/site.xml
----------------------------------------------------------------------
diff --git a/docs/src/site/site.xml b/docs/src/site/site.xml
index ddc9d03..c08b02d 100644
--- a/docs/src/site/site.xml
+++ b/docs/src/site/site.xml
@@ -102,7 +102,7 @@
<item name="Design Doc" href="https://issues.apache.org/jira/secure/attachment/12588887/Tez%20Design%20v1.1.pdf"/>
<item name="Talks and Meetup Recordings" href="talks.html"/>
<item name="Apache Incubator Proposal for Tez" href="http://wiki.apache.org/incubator/TezProposal"/>
- <item name="Project License" href="http://www.apache.org/licenses/"/>
+ <item name="Project License" href="http://www.apache.org/licenses/LICENSE-2.0.txt"/>
</menu>
<menu name="Community">
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c548c915/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 9a01090..6db1647 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1713,7 +1713,12 @@ public class DAGAppMaster extends AbstractService {
// for an app later
DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(newDAG.getID(),
submitTime, dagPlan, this.appAttemptID);
- historyEventHandler.handle(new DAGHistoryEvent(newDAG.getID(), submittedEvent));
+ try {
+ historyEventHandler.handleCriticalEvent(
+ new DAGHistoryEvent(newDAG.getID(), submittedEvent));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
startDAG(newDAG);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c548c915/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 8fc278f..ed73433 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -729,8 +729,13 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
boolean failedWhileCommitting = false;
if (dagSucceeded && !successfulOutputsAlreadyCommitted) {
// commit all shared outputs
- appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
- new DAGCommitStartedEvent(getID(), clock.getTime())));
+ try {
+ appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(),
+ new DAGCommitStartedEvent(getID(), clock.getTime())));
+ } catch (IOException e) {
+ LOG.error("Failed to send commit event to history/recovery handler", e);
+ return false;
+ }
for (VertexGroupInfo groupInfo : vertexGroups.values()) {
if (failedWhileCommitting) {
break;
@@ -1629,24 +1634,36 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
groupInfo.committed = true;
Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
- appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
- new VertexGroupCommitStartedEvent(dagId, groupInfo.groupName,
- clock.getTime())));
- for (String outputName : groupInfo.outputs) {
- OutputCommitter committer = v.getOutputCommitters().get(outputName);
- LOG.info("Committing output: " + outputName);
- if (!commitOutput(outputName, committer)) {
- // using same logic as vertex level commit. stop after first failure.
- failedCommit = true;
- break;
+ try {
+ appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(),
+ new VertexGroupCommitStartedEvent(dagId, groupInfo.groupName,
+ clock.getTime())));
+ } catch (IOException e) {
+ LOG.error("Failed to send commit recovery event to handler", e);
+ failedCommit = true;
+ }
+ if (!failedCommit) {
+ for (String outputName : groupInfo.outputs) {
+ OutputCommitter committer = v.getOutputCommitters().get(outputName);
+ LOG.info("Committing output: " + outputName);
+ if (!commitOutput(outputName, committer)) {
+ // using same logic as vertex level commit. stop after first failure.
+ failedCommit = true;
+ break;
+ }
}
}
if (failedCommit) {
break;
}
- appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
- new VertexGroupCommitFinishedEvent(dagId, groupInfo.groupName,
- clock.getTime())));
+ try {
+ appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(),
+ new VertexGroupCommitFinishedEvent(dagId, groupInfo.groupName,
+ clock.getTime())));
+ } catch (IOException e) {
+ LOG.error("Failed to send commit recovery event to handler", e);
+ failedCommit = true;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c548c915/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 1010ab0..46c91b1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -17,6 +17,7 @@
package org.apache.tez.dag.app.dag.impl;
+import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
@@ -1264,13 +1265,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
new DAGHistoryEvent(getDAGId(), startEvt));
}
- void logJobHistoryVertexFinishedEvent() {
+ void logJobHistoryVertexFinishedEvent() throws IOException {
this.setFinishTime();
VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
vertexName, initTimeRequested, initedTime, startTimeRequested,
startedTime, finishTime, VertexState.SUCCEEDED, "",
getAllCounters(), getVertexStats());
- this.appContext.getHistoryHandler().handle(
+ this.appContext.getHistoryHandler().handleCriticalEvent(
new DAGHistoryEvent(getDAGId(), finishEvt));
}
@@ -1439,7 +1440,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
case SUCCEEDED:
eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
finalState));
- logJobHistoryVertexFinishedEvent();
+ try {
+ logJobHistoryVertexFinishedEvent();
+ } catch (IOException e) {
+ LOG.error("Failed to send vertex finished event to recovery", e);
+ finalState = VertexState.ERROR;
+ eventHandler.handle(new DAGEvent(getDAGId(),
+ DAGEventType.INTERNAL_ERROR));
+ }
break;
default:
throw new TezUncheckedException("Unexpected VertexState: " + finalState);
@@ -1830,7 +1838,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
break;
}
assert vertex.tasks.size() == vertex.numTasks;
- if (vertex.tasks != null) {
+ if (vertex.tasks != null && vertex.numTasks != 0) {
for (Task task : vertex.tasks.values()) {
vertex.eventHandler.handle(
new TaskEventRecoverTask(task.getTaskId()));
@@ -2100,7 +2108,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
break;
}
assert vertex.tasks.size() == vertex.numTasks;
- if (vertex.tasks != null) {
+ if (vertex.tasks != null && vertex.numTasks != 0) {
for (Task task : vertex.tasks.values()) {
vertex.eventHandler.handle(
new TaskEventRecoverTask(task.getTaskId()));
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c548c915/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index 866cdc4..413d4ef 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -28,6 +28,7 @@ import org.apache.tez.dag.history.ats.ATSService;
import org.apache.tez.dag.history.recovery.RecoveryService;
import org.apache.tez.dag.records.TezDAGID;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
public class HistoryEventHandler extends CompositeService {
@@ -76,7 +77,14 @@ public class HistoryEventHandler extends CompositeService {
super.serviceStop();
}
- public void handle(DAGHistoryEvent event) {
+ /**
+ * Used by events that are critical for recovery
+ * DAG Submission/finished and any commit related activites are critical events
+ * In short, any events that are instances of SummaryEvent
+ * @param event History event
+ * @throws IOException
+ */
+ public void handleCriticalEvent(DAGHistoryEvent event) throws IOException {
TezDAGID dagId = event.getDagID();
String dagIdStr = "N/A";
if(dagId != null) {
@@ -102,6 +110,14 @@ public class HistoryEventHandler extends CompositeService {
+ ": " + event.getHistoryEvent().toString());
}
+ public void handle(DAGHistoryEvent event) {
+ try {
+ handleCriticalEvent(event);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c548c915/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 0074a4c..1353151 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -122,7 +122,8 @@ public class RecoveryService extends AbstractService {
++eventsProcessed;
handleRecoveryEvent(event);
} catch (Exception e) {
- // TODO handle failures - treat as fatal or ignore?
+ // For now, ignore any such errors as these are non-critical
+ // All summary event related errors are handled as critical
LOG.warn("Error handling recovery event", e);
}
}
@@ -161,7 +162,7 @@ public class RecoveryService extends AbstractService {
}
}
- public void handle(DAGHistoryEvent event) {
+ public void handle(DAGHistoryEvent event) throws IOException {
if (stopped.get()) {
LOG.warn("Igoring event as service stopped, eventType"
+ event.getHistoryEvent().getEventType());
@@ -228,13 +229,13 @@ public class RecoveryService extends AbstractService {
} catch (IOException ioe) {
LOG.warn("Error when trying to flush/close recovery file for"
+ " dag, dagId=" + event.getDagID());
- // FIXME handle error ?
}
}
}
- } catch (Exception e) {
- // FIXME handle failures
- LOG.warn("Error handling recovery event", e);
+ } catch (IOException ioe) {
+ LOG.warn("Error handling summary event"
+ + ", eventType=" + event.getHistoryEvent().getEventType(), ioe);
+ throw ioe;
}
}
} else {
@@ -248,39 +249,32 @@ public class RecoveryService extends AbstractService {
private void handleSummaryEvent(TezDAGID dagID,
HistoryEventType eventType,
- SummaryEvent summaryEvent) {
+ SummaryEvent summaryEvent) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Handling summary event"
+ ", dagID=" + dagID
+ ", eventType=" + eventType);
}
- try {
- if (summaryStream == null) {
- Path summaryPath = new Path(recoveryPath,
- appContext.getApplicationID()
- + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
- if (!recoveryDirFS.exists(summaryPath)) {
- summaryStream = recoveryDirFS.create(summaryPath, false,
- bufferSize);
- } else {
- summaryStream = recoveryDirFS.append(summaryPath, bufferSize);
- }
+ if (summaryStream == null) {
+ Path summaryPath = new Path(recoveryPath,
+ appContext.getApplicationID()
+ + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
+ if (!recoveryDirFS.exists(summaryPath)) {
+ summaryStream = recoveryDirFS.create(summaryPath, false,
+ bufferSize);
+ } else {
+ summaryStream = recoveryDirFS.append(summaryPath, bufferSize);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Writing recovery event to summary stream"
- + ", dagId=" + dagID
- + ", eventType=" + eventType);
- }
- summaryEvent.toSummaryProtoStream(summaryStream);
- } catch (IOException ioe) {
- // FIXME handle failures
- LOG.warn("Failed to write to stream", ioe);
}
-
-
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing recovery event to summary stream"
+ + ", dagId=" + dagID
+ + ", eventType=" + eventType);
+ }
+ summaryEvent.toSummaryProtoStream(summaryStream);
}
- private void handleRecoveryEvent(DAGHistoryEvent event) {
+ private void handleRecoveryEvent(DAGHistoryEvent event) throws IOException {
HistoryEventType eventType = event.getHistoryEvent().getEventType();
if (LOG.isDebugEnabled()) {
LOG.debug("Handling recovery event of type "
@@ -300,56 +294,57 @@ public class RecoveryService extends AbstractService {
return;
}
- try {
-
- if (!outputStreamMap.containsKey(dagID)) {
- Path dagFilePath = new Path(recoveryPath,
- dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
- FSDataOutputStream outputStream;
- if (recoveryDirFS.exists(dagFilePath)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Opening DAG recovery file in append mode"
- + ", filePath=" + dagFilePath);
- }
- outputStream = recoveryDirFS.append(dagFilePath, bufferSize);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Opening DAG recovery file in create mode"
- + ", filePath=" + dagFilePath);
- }
- outputStream = recoveryDirFS.create(dagFilePath, false, bufferSize);
+ if (!outputStreamMap.containsKey(dagID)) {
+ Path dagFilePath = new Path(recoveryPath,
+ dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
+ FSDataOutputStream outputStream;
+ if (recoveryDirFS.exists(dagFilePath)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Opening DAG recovery file in append mode"
+ + ", filePath=" + dagFilePath);
}
- outputStreamMap.put(dagID, outputStream);
+ outputStream = recoveryDirFS.append(dagFilePath, bufferSize);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Opening DAG recovery file in create mode"
+ + ", filePath=" + dagFilePath);
+ }
+ outputStream = recoveryDirFS.create(dagFilePath, false, bufferSize);
}
+ outputStreamMap.put(dagID, outputStream);
+ }
- FSDataOutputStream outputStream = outputStreamMap.get(dagID);
+ FSDataOutputStream outputStream = outputStreamMap.get(dagID);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Writing recovery event to output stream"
- + ", dagId=" + dagID
- + ", eventType=" + eventType);
- }
- ++unflushedEventsCount;
- outputStream.writeInt(event.getHistoryEvent().getEventType().ordinal());
- event.getHistoryEvent().toProtoStream(outputStream);
- if (!EnumSet.of(HistoryEventType.DAG_SUBMITTED,
- HistoryEventType.DAG_FINISHED).contains(eventType)) {
- maybeFlush(outputStream);
- }
- } catch (IOException ioe) {
- // FIXME handle failures
- LOG.warn("Failed to write to stream", ioe);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing recovery event to output stream"
+ + ", dagId=" + dagID
+ + ", eventType=" + eventType);
+ }
+ ++unflushedEventsCount;
+ outputStream.writeInt(event.getHistoryEvent().getEventType().ordinal());
+ event.getHistoryEvent().toProtoStream(outputStream);
+ if (!EnumSet.of(HistoryEventType.DAG_SUBMITTED,
+ HistoryEventType.DAG_FINISHED).contains(eventType)) {
+ maybeFlush(outputStream);
}
-
}
private void maybeFlush(FSDataOutputStream outputStream) throws IOException {
long currentTime = appContext.getClock().getTime();
boolean doFlush = false;
if (unflushedEventsCount >= maxUnflushedEvents) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Max unflushed events count reached. Flushing recovery data"
+ + ", unflushedEventsCount=" + unflushedEventsCount
+ + ", maxUnflushedEvents=" + maxUnflushedEvents);
+ }
doFlush = true;
} else if (flushInterval >= 0
&& ((currentTime - lastFlushTime) >= (flushInterval*1000))) {
+ LOG.debug("Flush interval time period elapsed. Flushing recovery data"
+ + ", lastTimeSinceFLush=" + lastFlushTime
+ + ", timeSinceLastFlush=" + (currentTime - lastFlushTime));
doFlush = true;
}
if (!doFlush) {
@@ -369,9 +364,9 @@ public class RecoveryService extends AbstractService {
if (LOG.isDebugEnabled()) {
LOG.debug("Flushing output stream"
+ ", lastTimeSinceFLush=" + lastFlushTime
+ + ", timeSinceLastFlush=" + (currentTime - lastFlushTime)
+ ", unflushedEventsCount=" + unflushedEventsCount
- + ", maxUnflushedEvents=" + maxUnflushedEvents
- + ", currentTime=" + currentTime);
+ + ", maxUnflushedEvents=" + maxUnflushedEvents);
}
unflushedEventsCount = 0;
[2/2] git commit: TEZ-944. Tez Job gets "Could not load native gpl
library" Error. (hitesh)
Posted by hi...@apache.org.
TEZ-944. Tez Job gets "Could not load native gpl library" Error. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8d87e039
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8d87e039
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8d87e039
Branch: refs/heads/master
Commit: 8d87e0398bbe0b270822caf06b6d04f6b871e449
Parents: c548c91
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Mar 18 14:22:14 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Mar 18 14:22:14 2014 -0700
----------------------------------------------------------------------
.../main/java/org/apache/tez/client/AMConfiguration.java | 11 ++++++++---
.../java/org/apache/tez/dag/api/TezConfiguration.java | 7 +++++++
2 files changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d87e039/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
index 132a73c..2016ffc 100644
--- a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
@@ -18,6 +18,7 @@
package org.apache.tez.client;
+import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -26,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.common.TezYARNUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
@@ -59,11 +61,14 @@ public class AMConfiguration {
}
this.queueName = this.amConf.get(TezConfiguration.TEZ_QUEUE_NAME);
+ this.env = new HashMap<String, String>();
+ TezYARNUtils.setEnvFromInputString(this.env,
+ this.amConf.get(TezConfiguration.TEZ_AM_ENV),
+ File.pathSeparator);
if (env != null) {
- this.env = env;
- } else {
- this.env = new HashMap<String, String>(0);
+ this.env.putAll(env);
}
+
this.localResources = localResources;
String stagingDirStr = amConf.get(TezConfiguration.TEZ_AM_STAGING_DIR);
if (stagingDirStr == null || stagingDirStr.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d87e039/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 29d04ab..1e2f604 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -62,6 +62,13 @@ public class TezConfiguration extends Configuration {
+ "java.opts";
public static final String TEZ_AM_JAVA_OPTS_DEFAULT = " -Xmx1024m ";
+ /** User-provided env for the Tez AM. Any env provided in AMConfiguration
+ * overrides env defined by this config property
+ * Should be specified as a comma-separated of key-value pairs where each pair
+ * is defined as KEY=VAL
+ */
+ public static final String TEZ_AM_ENV = TEZ_AM_PREFIX + "env";
+
public static final String TEZ_AM_CANCEL_DELEGATION_TOKEN = TEZ_AM_PREFIX +
"am.complete.cancel.delegation.tokens";
public static final boolean TEZ_AM_CANCEL_DELEGATION_TOKEN_DEFAULT = true;