You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/03/18 22:22:32 UTC

[1/2] git commit: TEZ-851. Handle failure to persist events to HDFS. (hitesh)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 3f3f94827 -> 8d87e0398


TEZ-851. Handle failure to persist events to HDFS. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/c548c915
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/c548c915
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/c548c915

Branch: refs/heads/master
Commit: c548c91576875030862aa909dea68cf8a40673a8
Parents: 3f3f948
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Mar 18 12:32:35 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Mar 18 12:32:35 2014 -0700

----------------------------------------------------------------------
 docs/src/site/site.xml                          |   2 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   7 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  47 ++++---
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  18 ++-
 .../tez/dag/history/HistoryEventHandler.java    |  18 ++-
 .../dag/history/recovery/RecoveryService.java   | 131 +++++++++----------
 6 files changed, 132 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c548c915/docs/src/site/site.xml
----------------------------------------------------------------------
diff --git a/docs/src/site/site.xml b/docs/src/site/site.xml
index ddc9d03..c08b02d 100644
--- a/docs/src/site/site.xml
+++ b/docs/src/site/site.xml
@@ -102,7 +102,7 @@
       <item name="Design Doc" href="https://issues.apache.org/jira/secure/attachment/12588887/Tez%20Design%20v1.1.pdf"/>
       <item name="Talks and Meetup Recordings" href="talks.html"/>
       <item name="Apache Incubator Proposal for Tez" href="http://wiki.apache.org/incubator/TezProposal"/>
-      <item name="Project License" href="http://www.apache.org/licenses/"/>
+      <item name="Project License" href="http://www.apache.org/licenses/LICENSE-2.0.txt"/>
     </menu>
 
     <menu name="Community">

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c548c915/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 9a01090..6db1647 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1713,7 +1713,12 @@ public class DAGAppMaster extends AbstractService {
     // for an app later
     DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(newDAG.getID(),
         submitTime, dagPlan, this.appAttemptID);
-    historyEventHandler.handle(new DAGHistoryEvent(newDAG.getID(), submittedEvent));
+    try {
+      historyEventHandler.handleCriticalEvent(
+          new DAGHistoryEvent(newDAG.getID(), submittedEvent));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
     startDAG(newDAG);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c548c915/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 8fc278f..ed73433 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -729,8 +729,13 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     boolean failedWhileCommitting = false;
     if (dagSucceeded && !successfulOutputsAlreadyCommitted) {
       // commit all shared outputs
-      appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
-          new DAGCommitStartedEvent(getID(), clock.getTime())));
+      try {
+        appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(),
+            new DAGCommitStartedEvent(getID(), clock.getTime())));
+      } catch (IOException e) {
+        LOG.error("Failed to send commit event to history/recovery handler", e);
+        return false;
+      }
       for (VertexGroupInfo groupInfo : vertexGroups.values()) {
         if (failedWhileCommitting) {
           break;
@@ -1629,24 +1634,36 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           }
           groupInfo.committed = true;
           Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
-          appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
-              new VertexGroupCommitStartedEvent(dagId, groupInfo.groupName,
-                  clock.getTime())));
-          for (String outputName : groupInfo.outputs) {
-            OutputCommitter committer = v.getOutputCommitters().get(outputName);
-            LOG.info("Committing output: " + outputName);
-            if (!commitOutput(outputName, committer)) {
-              // using same logic as vertex level commit. stop after first failure.
-              failedCommit = true;
-              break;
+          try {
+            appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(),
+                new VertexGroupCommitStartedEvent(dagId, groupInfo.groupName,
+                    clock.getTime())));
+          } catch (IOException e) {
+            LOG.error("Failed to send commit recovery event to handler", e);
+            failedCommit = true;
+          }
+          if (!failedCommit) {
+            for (String outputName : groupInfo.outputs) {
+              OutputCommitter committer = v.getOutputCommitters().get(outputName);
+              LOG.info("Committing output: " + outputName);
+              if (!commitOutput(outputName, committer)) {
+                // using same logic as vertex level commit. stop after first failure.
+                failedCommit = true;
+                break;
+              }
             }
           }
           if (failedCommit) {
             break;
           }
-          appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
-              new VertexGroupCommitFinishedEvent(dagId, groupInfo.groupName,
-                  clock.getTime())));
+          try {
+            appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(),
+                new VertexGroupCommitFinishedEvent(dagId, groupInfo.groupName,
+                    clock.getTime())));
+          } catch (IOException e) {
+            LOG.error("Failed to send commit recovery event to handler", e);
+            failedCommit = true;
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c548c915/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 1010ab0..46c91b1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.tez.dag.app.dag.impl;
 
+import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -1264,13 +1265,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         new DAGHistoryEvent(getDAGId(), startEvt));
   }
 
-  void logJobHistoryVertexFinishedEvent() {
+  void logJobHistoryVertexFinishedEvent() throws IOException {
     this.setFinishTime();
     VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
         vertexName, initTimeRequested, initedTime, startTimeRequested,
         startedTime, finishTime, VertexState.SUCCEEDED, "",
         getAllCounters(), getVertexStats());
-    this.appContext.getHistoryHandler().handle(
+    this.appContext.getHistoryHandler().handleCriticalEvent(
         new DAGHistoryEvent(getDAGId(), finishEvt));
   }
 
@@ -1439,7 +1440,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       case SUCCEEDED:
         eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
             finalState));
-        logJobHistoryVertexFinishedEvent();
+        try {
+          logJobHistoryVertexFinishedEvent();
+        } catch (IOException e) {
+          LOG.error("Failed to send vertex finished event to recovery", e);
+          finalState = VertexState.ERROR;
+          eventHandler.handle(new DAGEvent(getDAGId(),
+              DAGEventType.INTERNAL_ERROR));
+        }
         break;
       default:
         throw new TezUncheckedException("Unexpected VertexState: " + finalState);
@@ -1830,7 +1838,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             break;
           }
           assert vertex.tasks.size() == vertex.numTasks;
-          if (vertex.tasks != null) {
+          if (vertex.tasks != null && vertex.numTasks != 0) {
             for (Task task : vertex.tasks.values()) {
               vertex.eventHandler.handle(
                   new TaskEventRecoverTask(task.getTaskId()));
@@ -2100,7 +2108,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             break;
           }
           assert vertex.tasks.size() == vertex.numTasks;
-          if (vertex.tasks != null) {
+          if (vertex.tasks != null && vertex.numTasks != 0) {
             for (Task task : vertex.tasks.values()) {
               vertex.eventHandler.handle(
                   new TaskEventRecoverTask(task.getTaskId()));

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c548c915/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index 866cdc4..413d4ef 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -28,6 +28,7 @@ import org.apache.tez.dag.history.ats.ATSService;
 import org.apache.tez.dag.history.recovery.RecoveryService;
 import org.apache.tez.dag.records.TezDAGID;
 
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class HistoryEventHandler extends CompositeService {
@@ -76,7 +77,14 @@ public class HistoryEventHandler extends CompositeService {
     super.serviceStop();
   }
 
-  public void handle(DAGHistoryEvent event) {
+  /**
+   * Used by events that are critical for recovery
+   * DAG Submission/finished and any commit related activites are critical events
+   * In short, any events that are instances of SummaryEvent
+   * @param event History event
+   * @throws IOException
+   */
+  public void handleCriticalEvent(DAGHistoryEvent event) throws IOException {
     TezDAGID dagId = event.getDagID();
     String dagIdStr = "N/A";
     if(dagId != null) {
@@ -102,6 +110,14 @@ public class HistoryEventHandler extends CompositeService {
         + ": " + event.getHistoryEvent().toString());
   }
 
+  public void handle(DAGHistoryEvent event) {
+    try {
+      handleCriticalEvent(event);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c548c915/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 0074a4c..1353151 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -122,7 +122,8 @@ public class RecoveryService extends AbstractService {
               ++eventsProcessed;
               handleRecoveryEvent(event);
             } catch (Exception e) {
-              // TODO handle failures - treat as fatal or ignore?
+              // For now, ignore any such errors as these are non-critical
+              // All summary event related errors are handled as critical
               LOG.warn("Error handling recovery event", e);
             }
           }
@@ -161,7 +162,7 @@ public class RecoveryService extends AbstractService {
     }
   }
 
-  public void handle(DAGHistoryEvent event) {
+  public void handle(DAGHistoryEvent event) throws IOException {
     if (stopped.get()) {
       LOG.warn("Igoring event as service stopped, eventType"
           + event.getHistoryEvent().getEventType());
@@ -228,13 +229,13 @@ public class RecoveryService extends AbstractService {
               } catch (IOException ioe) {
                 LOG.warn("Error when trying to flush/close recovery file for"
                     + " dag, dagId=" + event.getDagID());
-                // FIXME handle error ?
               }
             }
           }
-        } catch (Exception e) {
-          // FIXME handle failures
-          LOG.warn("Error handling recovery event", e);
+        } catch (IOException ioe) {
+          LOG.warn("Error handling summary event"
+              + ", eventType=" + event.getHistoryEvent().getEventType(), ioe);
+          throw ioe;
         }
       }
     } else {
@@ -248,39 +249,32 @@ public class RecoveryService extends AbstractService {
 
   private void handleSummaryEvent(TezDAGID dagID,
       HistoryEventType eventType,
-      SummaryEvent summaryEvent) {
+      SummaryEvent summaryEvent) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Handling summary event"
           + ", dagID=" + dagID
           + ", eventType=" + eventType);
     }
-    try {
-      if (summaryStream == null) {
-        Path summaryPath = new Path(recoveryPath,
-            appContext.getApplicationID()
-                + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
-        if (!recoveryDirFS.exists(summaryPath)) {
-          summaryStream = recoveryDirFS.create(summaryPath, false,
-              bufferSize);
-        } else {
-          summaryStream = recoveryDirFS.append(summaryPath, bufferSize);
-        }
+    if (summaryStream == null) {
+      Path summaryPath = new Path(recoveryPath,
+          appContext.getApplicationID()
+              + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
+      if (!recoveryDirFS.exists(summaryPath)) {
+        summaryStream = recoveryDirFS.create(summaryPath, false,
+            bufferSize);
+      } else {
+        summaryStream = recoveryDirFS.append(summaryPath, bufferSize);
       }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Writing recovery event to summary stream"
-            + ", dagId=" + dagID
-            + ", eventType=" + eventType);
-      }
-      summaryEvent.toSummaryProtoStream(summaryStream);
-    } catch (IOException ioe) {
-      // FIXME handle failures
-      LOG.warn("Failed to write to stream", ioe);
     }
-
-
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Writing recovery event to summary stream"
+          + ", dagId=" + dagID
+          + ", eventType=" + eventType);
+    }
+    summaryEvent.toSummaryProtoStream(summaryStream);
   }
 
-  private void handleRecoveryEvent(DAGHistoryEvent event) {
+  private void handleRecoveryEvent(DAGHistoryEvent event) throws IOException {
     HistoryEventType eventType = event.getHistoryEvent().getEventType();
     if (LOG.isDebugEnabled()) {
       LOG.debug("Handling recovery event of type "
@@ -300,56 +294,57 @@ public class RecoveryService extends AbstractService {
       return;
     }
 
-    try {
-
-      if (!outputStreamMap.containsKey(dagID)) {
-        Path dagFilePath = new Path(recoveryPath,
-            dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
-        FSDataOutputStream outputStream;
-        if (recoveryDirFS.exists(dagFilePath)) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Opening DAG recovery file in append mode"
-                + ", filePath=" + dagFilePath);
-          }
-          outputStream = recoveryDirFS.append(dagFilePath, bufferSize);
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Opening DAG recovery file in create mode"
-                + ", filePath=" + dagFilePath);
-          }
-          outputStream = recoveryDirFS.create(dagFilePath, false, bufferSize);
+    if (!outputStreamMap.containsKey(dagID)) {
+      Path dagFilePath = new Path(recoveryPath,
+          dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
+      FSDataOutputStream outputStream;
+      if (recoveryDirFS.exists(dagFilePath)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Opening DAG recovery file in append mode"
+              + ", filePath=" + dagFilePath);
         }
-        outputStreamMap.put(dagID, outputStream);
+        outputStream = recoveryDirFS.append(dagFilePath, bufferSize);
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Opening DAG recovery file in create mode"
+              + ", filePath=" + dagFilePath);
+        }
+        outputStream = recoveryDirFS.create(dagFilePath, false, bufferSize);
       }
+      outputStreamMap.put(dagID, outputStream);
+    }
 
-      FSDataOutputStream outputStream = outputStreamMap.get(dagID);
+    FSDataOutputStream outputStream = outputStreamMap.get(dagID);
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Writing recovery event to output stream"
-            + ", dagId=" + dagID
-            + ", eventType=" + eventType);
-      }
-      ++unflushedEventsCount;
-      outputStream.writeInt(event.getHistoryEvent().getEventType().ordinal());
-      event.getHistoryEvent().toProtoStream(outputStream);
-      if (!EnumSet.of(HistoryEventType.DAG_SUBMITTED,
-          HistoryEventType.DAG_FINISHED).contains(eventType)) {
-        maybeFlush(outputStream);
-      }
-    } catch (IOException ioe) {
-      // FIXME handle failures
-      LOG.warn("Failed to write to stream", ioe);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Writing recovery event to output stream"
+          + ", dagId=" + dagID
+          + ", eventType=" + eventType);
+    }
+    ++unflushedEventsCount;
+    outputStream.writeInt(event.getHistoryEvent().getEventType().ordinal());
+    event.getHistoryEvent().toProtoStream(outputStream);
+    if (!EnumSet.of(HistoryEventType.DAG_SUBMITTED,
+        HistoryEventType.DAG_FINISHED).contains(eventType)) {
+      maybeFlush(outputStream);
     }
-
   }
 
   private void maybeFlush(FSDataOutputStream outputStream) throws IOException {
     long currentTime = appContext.getClock().getTime();
     boolean doFlush = false;
     if (unflushedEventsCount >= maxUnflushedEvents) {
+      if  (LOG.isDebugEnabled()) {
+        LOG.debug("Max unflushed events count reached. Flushing recovery data"
+            + ", unflushedEventsCount=" + unflushedEventsCount
+            + ", maxUnflushedEvents=" + maxUnflushedEvents);
+      }
       doFlush = true;
     } else if (flushInterval >= 0
         && ((currentTime - lastFlushTime) >= (flushInterval*1000))) {
+      LOG.debug("Flush interval time period elapsed. Flushing recovery data"
+          + ", lastTimeSinceFLush=" + lastFlushTime
+          + ", timeSinceLastFlush=" + (currentTime - lastFlushTime));
       doFlush = true;
     }
     if (!doFlush) {
@@ -369,9 +364,9 @@ public class RecoveryService extends AbstractService {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Flushing output stream"
           + ", lastTimeSinceFLush=" + lastFlushTime
+          + ", timeSinceLastFlush=" + (currentTime - lastFlushTime)
           + ", unflushedEventsCount=" + unflushedEventsCount
-          + ", maxUnflushedEvents=" + maxUnflushedEvents
-          + ", currentTime=" + currentTime);
+          + ", maxUnflushedEvents=" + maxUnflushedEvents);
     }
 
     unflushedEventsCount = 0;


[2/2] git commit: TEZ-944. Tez Job gets "Could not load native gpl library" Error. (hitesh)

Posted by hi...@apache.org.
TEZ-944. Tez Job gets "Could not load native gpl library" Error. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8d87e039
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8d87e039
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8d87e039

Branch: refs/heads/master
Commit: 8d87e0398bbe0b270822caf06b6d04f6b871e449
Parents: c548c91
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Mar 18 14:22:14 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Mar 18 14:22:14 2014 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/tez/client/AMConfiguration.java | 11 ++++++++---
 .../java/org/apache/tez/dag/api/TezConfiguration.java    |  7 +++++++
 2 files changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d87e039/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
index 132a73c..2016ffc 100644
--- a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.client;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
@@ -26,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.common.TezYARNUtils;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 
@@ -59,11 +61,14 @@ public class AMConfiguration {
     }
     this.queueName = this.amConf.get(TezConfiguration.TEZ_QUEUE_NAME);
 
+    this.env = new HashMap<String, String>();
+    TezYARNUtils.setEnvFromInputString(this.env,
+        this.amConf.get(TezConfiguration.TEZ_AM_ENV),
+        File.pathSeparator);
     if (env != null) {
-      this.env = env;
-    } else {
-      this.env = new HashMap<String, String>(0);
+      this.env.putAll(env);
     }
+
     this.localResources = localResources;
     String stagingDirStr = amConf.get(TezConfiguration.TEZ_AM_STAGING_DIR);
     if (stagingDirStr == null || stagingDirStr.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8d87e039/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 29d04ab..1e2f604 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -62,6 +62,13 @@ public class TezConfiguration extends Configuration {
       + "java.opts";
   public static final String TEZ_AM_JAVA_OPTS_DEFAULT = " -Xmx1024m ";
 
+  /** User-provided env for the Tez AM. Any env provided in AMConfiguration
+   * overrides env defined by this config property
+   * Should be specified as a comma-separated of key-value pairs where each pair
+   * is defined as KEY=VAL
+   */
+  public static final String TEZ_AM_ENV = TEZ_AM_PREFIX + "env";
+
   public static final String TEZ_AM_CANCEL_DELEGATION_TOKEN = TEZ_AM_PREFIX +
       "am.complete.cancel.delegation.tokens";
   public static final boolean TEZ_AM_CANCEL_DELEGATION_TOKEN_DEFAULT = true;