You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/10/21 22:37:19 UTC

tez git commit: TEZ-3405. Support ability for AM to kill itself if there is no client heartbeating to it. (hitesh)

Repository: tez
Updated Branches:
  refs/heads/master a1563eff7 -> 1cd62dedc


TEZ-3405. Support ability for AM to kill itself if there is no client heartbeating to it. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1cd62ded
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1cd62ded
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1cd62ded

Branch: refs/heads/master
Commit: 1cd62dedcc1fcf468a7c8a38f9c9ffbd0b28aa77
Parents: a1563ef
Author: Hitesh Shah <hi...@apache.org>
Authored: Fri Oct 21 15:33:03 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Fri Oct 21 15:33:03 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 tez-api/findbugs-exclude.xml                    |  10 ++
 .../java/org/apache/tez/client/TezClient.java   |  89 +++++++++++++--
 .../org/apache/tez/common/TezCommonUtils.java   |  58 ++++++++++
 .../java/org/apache/tez/common/TezUtils.java    |  21 ----
 .../apache/tez/dag/api/TezConfiguration.java    |  26 ++++-
 .../org/apache/tez/dag/api/TezConstants.java    |  12 ++
 .../org/apache/tez/client/TestTezClient.java    |  36 ++++++
 .../apache/tez/common/TestTezCommonUtils.java   |  71 ++++++++++++
 .../org/apache/tez/common/TestTezUtils.java     |  22 ----
 tez-dag/findbugs-exclude.xml                    |   2 +
 .../java/org/apache/tez/client/LocalClient.java |   2 +-
 .../tez/dag/api/client/DAGClientHandler.java    |  11 ++
 .../tez/dag/api/client/DAGClientServer.java     |   3 +
 ...DAGClientAMProtocolBlockingPBServerImpl.java |   7 ++
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  73 +++++++++++-
 .../dag/api/client/TestDAGClientHandler.java    |   2 +
 .../org/apache/tez/runtime/task/TezChild.java   |   5 +-
 .../java/org/apache/tez/test/TestTezJobs.java   | 114 +++++++++++++++++++
 19 files changed, 503 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dabc704..1afcacb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3405. Support ability for AM to kill itself if there is no client heartbeating to it.
   TEZ-3483. Create basic travis yml file for Tez.
   TEZ-3452. Auto-reduce parallelism calculation can overflow with large inputs
   TEZ-3439. Tez joinvalidate fails when first input argument size is bigger than the second.

http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-api/findbugs-exclude.xml b/tez-api/findbugs-exclude.xml
index 95b9207..10d27f7 100644
--- a/tez-api/findbugs-exclude.xml
+++ b/tez-api/findbugs-exclude.xml
@@ -121,4 +121,14 @@
     <Bug pattern="EI_EXPOSE_REP" />
   </Match>
 
+  <Match>
+    <Class name="org.apache.tez.client.TezClient"/>
+    <Or>
+      <Field name="clientTimeout"/>
+      <Field name="frameworkClient"/>
+      <Field name="sessionAppId"/>
+    </Or>
+    <Bug pattern="IS2_INCONSISTENT_SYNC" />
+  </Match>
+
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 780fcb7..29e7a8b 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -22,6 +22,10 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.text.NumberFormat;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.TimeUnit;
 import java.util.HashMap;
@@ -81,6 +85,7 @@ import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.ServiceException;
 
 /**
@@ -118,8 +123,8 @@ public class TezClient {
   private String diagnostics;
   @VisibleForTesting
   final boolean isSession;
-  private boolean sessionStarted = false;
-  private boolean sessionStopped = false;
+  private final AtomicBoolean sessionStarted = new AtomicBoolean(false);
+  private final AtomicBoolean sessionStopped = new AtomicBoolean(false);
   /** Tokens which will be required for all DAGs submitted to this session. */
   private Credentials sessionCredentials = new Credentials();
   private long clientTimeout;
@@ -143,6 +148,8 @@ public class TezClient {
   private AtomicInteger serializedSubmitDAGPlanRequestCounter = new AtomicInteger(0);
   private FileSystem stagingFs = null;
 
+  private ScheduledExecutorService amKeepAliveService;
+
   private TezClient(String name, TezConfiguration tezConf) {
     this(name, tezConf, tezConf.getBoolean(
         TezConfiguration.TEZ_AM_SESSION_MODE, TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT));    
@@ -315,7 +322,7 @@ public class TezClient {
    */
   public synchronized void addAppMasterLocalFiles(Map<String, LocalResource> localFiles) {
     Preconditions.checkNotNull(localFiles);
-    if (isSession && sessionStarted) {
+    if (isSession && sessionStarted.get()) {
       additionalLocalResources.putAll(localFiles);
     }
     amConfig.addAMLocalResources(localFiles);
@@ -345,7 +352,7 @@ public class TezClient {
    */
   public synchronized void setAppMasterCredentials(Credentials credentials) {
     Preconditions
-        .checkState(!sessionStarted,
+        .checkState(!sessionStarted.get(),
             "Credentials cannot be set after the session App Master has been started");
     amConfig.setCredentials(credentials);
   }
@@ -433,15 +440,66 @@ public class TezClient {
         frameworkClient.submitApplication(appContext);
         ApplicationReport appReport = frameworkClient.getApplicationReport(sessionAppId);
         LOG.info("The url to track the Tez Session: " + appReport.getTrackingUrl());
-        sessionStarted = true;
+        sessionStarted.set(true);
       } catch (YarnException e) {
         throw new TezException(e);
       }
 
+      long amClientKeepAliveTimeoutIntervalMillis =
+          TezCommonUtils.getAMClientHeartBeatTimeoutMillis(amConfig.getTezConfiguration());
+      // Poll at minimum of 1 second interval
+      long pollPeriod = TezCommonUtils.
+          getAMClientHeartBeatPollIntervalMillis(amConfig.getTezConfiguration(),
+              amClientKeepAliveTimeoutIntervalMillis, 10);
+
+      boolean isLocal = amConfig.getTezConfiguration().getBoolean(
+          TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
+      if (!isLocal && amClientKeepAliveTimeoutIntervalMillis > 0) {
+        amKeepAliveService = Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder()
+                .setDaemon(true).setNameFormat("AMKeepAliveThread #%d").build());
+        amKeepAliveService.scheduleWithFixedDelay(new Runnable() {
+
+          private DAGClientAMProtocolBlockingPB proxy;
+
+          @Override
+          public void run() {
+            proxy = sendAMHeartbeat(proxy);
+          }
+        }, pollPeriod, pollPeriod, TimeUnit.MILLISECONDS);
+      }
+
       this.stagingFs = FileSystem.get(amConfig.getTezConfiguration());
     }
   }
-  
+
+  public DAGClientAMProtocolBlockingPB sendAMHeartbeat(DAGClientAMProtocolBlockingPB proxy) {
+    if (sessionStopped.get()) {
+      // Ignore sending heartbeat as session being stopped
+      return null;
+    }
+    try {
+      if (proxy == null) {
+        try {
+          proxy = waitForProxy();
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted while trying to create a connection to the AM", e);
+        }
+      }
+      if (proxy != null) {
+        LOG.debug("Sending heartbeat to AM");
+        proxy.getAMStatus(null, GetAMStatusRequestProto.newBuilder().build());
+      }
+      return proxy;
+    } catch (Exception e) {
+      LOG.info("Exception when sending heartbeat to AM for app {}: {}", sessionAppId,
+          e.getMessage());
+      LOG.debug("Error when sending heartbeat ping to AM. Resetting AM proxy for app: {}"
+          + " due to exception :", sessionAppId, e);
+      return null;
+    }
+  }
+
   /**
    * Submit a DAG. <br>In non-session mode, it submits a new App Master to the
    * cluster.<br>In session mode, it submits the DAG to the session App Master. It
@@ -566,11 +624,14 @@ public class TezClient {
    */
   public synchronized void stop() throws TezException, IOException {
     try {
-      if (sessionStarted) {
+      if (amKeepAliveService != null) {
+        amKeepAliveService.shutdownNow();
+      }
+      if (sessionStarted.get()) {
         LOG.info("Shutting down Tez Session"
             + ", sessionName=" + clientName
             + ", applicationId=" + sessionAppId);
-        sessionStopped = true;
+        sessionStopped.set(true);
         boolean sessionShutdownSuccessful = false;
         try {
           DAGClientAMProtocolBlockingPB proxy = getAMProxy(sessionAppId);
@@ -914,9 +975,9 @@ public class TezClient {
 
   private void verifySessionStateForSubmission() throws SessionNotRunning {
     Preconditions.checkState(isSession, "Invalid without session mode");
-    if (!sessionStarted) {
+    if (!sessionStarted.get()) {
       throw new SessionNotRunning("Session not started");
-    } else if (sessionStopped) {
+    } else if (sessionStopped.get()) {
       throw new SessionNotRunning("Session stopped by user");
     }
   }
@@ -1031,6 +1092,14 @@ public class TezClient {
          append(tezDagIdFormat.get().format(1)).toString();
   }
 
+  @VisibleForTesting
+  @Private
+  public synchronized void cancelAMKeepAlive() {
+    if (amKeepAliveService != null) {
+      amKeepAliveService.shutdownNow();
+    }
+  }
+
   /**
    * A builder for setting up an instance of {@link org.apache.tez.client.TezClient}
    */

http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
index afdce39..69e48b2 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
@@ -487,4 +487,62 @@ public class TezCommonUtils {
     jobToken.write(jobToken_dob);
     return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength());
   }
+
+  public static String getSystemPropertiesToLog(Configuration conf) {
+    Collection <String> keys = conf.getTrimmedStringCollection(
+        TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG);
+    if (keys.isEmpty()) {
+      keys = TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG_DEFAULT;
+    }
+    StringBuilder sb = new StringBuilder();
+    sb.append("\n/************************************************************\n");
+    sb.append("[system properties]\n");
+    for (String key : keys) {
+      sb.append(key).append(": ").append(System.getProperty(key)).append('\n');
+    }
+    sb.append("************************************************************/");
+    return sb.toString();
+  }
+
+  /**
+   * Helper function to get the heartbeat interval for client-AM heartbeats
+   * See {@link TezConfiguration#TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS} for more details.
+   * @param conf Configuration object
+   * @return heartbeat interval in milliseconds. -1 implies disabled.
+   */
+  public static long getAMClientHeartBeatTimeoutMillis(Configuration conf) {
+    int val = conf.getInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS,
+        TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_DEFAULT);
+    if (val < 0) {
+      return -1;
+    }
+    if (val > 0 && val < TezConstants.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_MINIMUM) {
+      return TezConstants.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_MINIMUM * 1000;
+    }
+    return val * 1000;
+  }
+
+  /**
+   * Helper function to get the poll interval for client-AM heartbeats.
+   * @param conf Configuration object
+   * @param heartbeatIntervalMillis Heartbeat interval in milliseconds
+   * @param buckets How many times to poll within the provided heartbeat interval
+   * @return poll interval in milliseconds
+   */
+  public static long getAMClientHeartBeatPollIntervalMillis(Configuration conf,
+                                                            long heartbeatIntervalMillis,
+                                                            int buckets) {
+    if (heartbeatIntervalMillis <= 0) {
+      return -1;
+    }
+    int pollInterval = conf.getInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS,
+        TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS_DEFAULT);
+    if (pollInterval > 0) {
+      return Math.max(TezConstants.TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS_MINIMUM,
+          pollInterval);
+    }
+    return Math.max(TezConstants.TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS_MINIMUM,
+        heartbeatIntervalMillis/buckets);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
index 3aa1914..dfdf9fa 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
@@ -20,7 +20,6 @@ package org.apache.tez.common;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -37,14 +36,12 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.client.TezClientUtils;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
-
 /**
  * Utility methods for setting up a DAG. Has helpers for setting up log4j configuration, converting
  * {@link org.apache.hadoop.conf.Configuration} to {@link org.apache.tez.dag.api.UserPayload} etc.
@@ -68,23 +65,6 @@ public class TezUtils {
     TezClientUtils.addLog4jSystemProperties(logLevel, vargs);
   }
 
-  public static String getSystemPropertiesToLog(Configuration conf) {
-    Collection <String> keys = conf.getTrimmedStringCollection(
-        TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG);
-    if (keys.isEmpty()) {
-      keys = TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG_DEFAULT;
-    }
-    StringBuilder sb = new StringBuilder();
-    sb.append("\n/************************************************************\n");
-    sb.append("[system properties]\n");
-    for (String key : keys) {
-      sb.append(key).append(": ").append(System.getProperty(key)).append('\n');
-    }
-    sb.append("************************************************************/");
-    return sb.toString();
-  }
-
-
   /**
    * Convert a Configuration to compressed ByteString using Protocol buffer
    *
@@ -201,5 +181,4 @@ public class TezUtils {
     return convertToHistoryText(null, conf);
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 77ea4ff..c4272b7 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -606,7 +606,7 @@ public class TezConfiguration extends Configuration {
   @ConfigurationProperty(type="integer")
   public static final String TEZ_AM_CLIENT_THREAD_COUNT =
       TEZ_AM_PREFIX + "client.am.thread-count";
-  public static final int TEZ_AM_CLIENT_THREAD_COUNT_DEFAULT = 1;
+  public static final int TEZ_AM_CLIENT_THREAD_COUNT_DEFAULT = 2;
   
   /**
    * String value. Range of ports that the AM can use when binding for client connections. Leave blank
@@ -1677,4 +1677,28 @@ public class TezConfiguration extends Configuration {
           "java.vendor","java.version","java.vm.name","java.class.path",
           "java.io.tmpdir","user.dir","user.name"));
 
+  /**
+   * Int value. Time interval (in seconds). If the Tez AM does not receive a heartbeat from the
+   * client within this time interval, it will kill any running DAG and shut down. Required to
+   * re-cycle orphaned Tez applications where the client is no longer alive. A negative value
+   * can be set to disable this check. For a positive value, the minimum value is 10 seconds.
+   * Values between 0 and 10 seconds will be reset to the minimum value.
+   * Only relevant in session mode.
+   * This is disabled by default i.e. by default, the Tez AM will go on to
+   * complete the DAG and only kill itself after hitting the DAG submission timeout defined by
+   * {@link #TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS}
+   */
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty(type="integer")
+  public static final String TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS =
+      TEZ_PREFIX + "am.client.heartbeat.timeout.secs";
+  public static final int TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_DEFAULT = -1;
+
+
+  @Private
+  @ConfigurationScope(Scope.AM)
+  public static final String TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS =
+      TEZ_PREFIX + "am.client.heartbeat.poll.interval.millis";
+  public static final int TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS_DEFAULT = -1;
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
index 6e1cb2d..06b9cb7 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
@@ -112,4 +112,16 @@ public class TezConstants {
   public static String getTezUberServicePluginName() {
     return TEZ_AM_SERVICE_PLUGIN_NAME_IN_AM;
   }
+
+  /**
+   * Minimum heartbeat timeout value for the Client to AM heartbeat.
+   */
+  public static final int TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_MINIMUM = 10;
+
+  /**
+   * Minimum polling interval used for the client-AM heartbeat.
+   */
+  public static final long TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS_MINIMUM = 1000;
+
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 48dfff4..dbbd619 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -794,4 +794,40 @@ public class TestTezClient {
     } catch (ApplicationNotFoundException e) {
     }
   }
+
+  @Test(timeout = 30000)
+  public void testAMClientHeartbeat() throws Exception {
+    TezConfiguration conf = new TezConfiguration();
+    conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS, 10);
+    final TezClientForTest client = configureAndCreateTezClient(conf);
+    client.start();
+    long start = System.currentTimeMillis();
+    while (true) {
+      if (System.currentTimeMillis() > (start + 5000)) {
+        break;
+      }
+      Thread.sleep(1000);
+    }
+    client.stop();
+    verify(client.sessionAmProxy, atLeast(3)).getAMStatus(any(RpcController.class),
+        any(GetAMStatusRequestProto.class));
+
+    conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS, -1);
+    final TezClientForTest client2 = configureAndCreateTezClient(conf);
+    client2.start();
+    start = System.currentTimeMillis();
+    while (true) {
+      if (System.currentTimeMillis() > (start + 5000)) {
+        break;
+      }
+      Thread.sleep(1000);
+    }
+    client2.stop();
+    verify(client2.sessionAmProxy, times(0)).getAMStatus(any(RpcController.class),
+        any(GetAMStatusRequestProto.class));
+
+
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java b/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
index a7e6069..5f0b33b 100644
--- a/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
+++ b/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
@@ -314,4 +314,75 @@ public class TestTezCommonUtils {
 
   }
 
+  @Test (timeout = 5000)
+  public void testAMClientHeartBeatTimeout() {
+    TezConfiguration conf = new TezConfiguration(false);
+
+    // -1 for any negative value
+    Assert.assertEquals(-1,
+        TezCommonUtils.getAMClientHeartBeatTimeoutMillis(conf));
+    conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS, -2);
+    Assert.assertEquals(-1,
+        TezCommonUtils.getAMClientHeartBeatTimeoutMillis(conf));
+
+    // For any value > 0 but less than min, revert to min
+    conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS,
+        TezConstants.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_MINIMUM - 1);
+    Assert.assertEquals(TezConstants.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_MINIMUM * 1000,
+        TezCommonUtils.getAMClientHeartBeatTimeoutMillis(conf));
+
+    // For val > min, should remain val as configured
+    conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS,
+        TezConstants.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_MINIMUM * 2);
+    Assert.assertEquals(TezConstants.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_MINIMUM * 2000,
+        TezCommonUtils.getAMClientHeartBeatTimeoutMillis(conf));
+
+    conf = new TezConfiguration(false);
+    Assert.assertEquals(-1, TezCommonUtils.getAMClientHeartBeatPollIntervalMillis(conf, -1, 10));
+    Assert.assertEquals(-1, TezCommonUtils.getAMClientHeartBeatPollIntervalMillis(conf, -123, 10));
+    Assert.assertEquals(-1, TezCommonUtils.getAMClientHeartBeatPollIntervalMillis(conf, 0, 10));
+
+    // min poll interval is 1000
+    Assert.assertEquals(1000, TezCommonUtils.getAMClientHeartBeatPollIntervalMillis(conf, 600, 10));
+
+    // Poll interval is heartbeat interval/10
+    Assert.assertEquals(2000,
+        TezCommonUtils.getAMClientHeartBeatPollIntervalMillis(conf, 20000, 10));
+
+    // Configured poll interval ignored
+    conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS, -1);
+    Assert.assertEquals(4000,
+        TezCommonUtils.getAMClientHeartBeatPollIntervalMillis(conf, 20000, 5));
+
+    // Positive poll interval is allowed
+    conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS, 2000);
+    Assert.assertEquals(2000,
+        TezCommonUtils.getAMClientHeartBeatPollIntervalMillis(conf, 20000, 5));
+
+
+  }
+
+  @Test
+  public void testLogSystemProperties() throws Exception {
+    Configuration conf = new Configuration();
+    // test default logging
+    conf.set(TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG, " ");
+    String value = TezCommonUtils.getSystemPropertiesToLog(conf);
+    for(String key: TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG_DEFAULT) {
+      Assert.assertTrue(value.contains(key));
+    }
+
+    // test logging of selected keys
+    String classpath = "java.class.path";
+    String os = "os.name";
+    String version = "java.version";
+    conf.set(TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG, classpath + ", " + os);
+    value = TezCommonUtils.getSystemPropertiesToLog(conf);
+    Assert.assertNotNull(value);
+    Assert.assertTrue(value.contains(classpath));
+    Assert.assertTrue(value.contains(os));
+    Assert.assertFalse(value.contains(version));
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
index 2eab776..61bb9a7 100644
--- a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
+++ b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
@@ -230,26 +230,4 @@ public class TestTezUtils {
 
   }
 
-  @Test
-  public void testLogSystemProperties() throws Exception {
-    Configuration conf = new Configuration();
-    // test default logging
-    conf.set(TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG, " ");
-    String value = TezUtils.getSystemPropertiesToLog(conf);
-    for(String key: TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG_DEFAULT) {
-      assertTrue(value.contains(key));
-    }
-
-    // test logging of selected keys
-    String classpath = "java.class.path";
-    String os = "os.name";
-    String version = "java.version";
-    conf.set(TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG, classpath + ", " + os);
-    value = TezUtils.getSystemPropertiesToLog(conf);
-    assertNotNull(value);
-    assertTrue(value.contains(classpath));
-    assertTrue(value.contains(os));
-    assertFalse(value.contains(version));
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-dag/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
index e8adbb3..c3e099e 100644
--- a/tez-dag/findbugs-exclude.xml
+++ b/tez-dag/findbugs-exclude.xml
@@ -150,6 +150,8 @@
     <Class name="org.apache.tez.dag.app.DAGAppMaster"/>
     <Or>
       <Field name="context"/>
+      <Field name="clientAMHeartbeatTimeoutIntervalMillis"/>
+      <Field name="clientHandler"/>
       <Field name="currentDAG"/>
       <Field name="state"/>
       <Field name="taskSchedulerManager"/>

http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index 474f4ca..7c65c07 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -319,8 +319,8 @@ public class LocalClient extends FrameworkClient {
                   new SystemClock(), appSubmitTime, isSession, userDir.toUri().getPath(),
                   new String[] {localDir.toUri().getPath()}, new String[] {logDir.toUri().getPath()},
                   amCredentials, UserGroupInformation.getCurrentUser().getShortUserName());
-          clientHandler = new DAGClientHandler(dagAppMaster);
           DAGAppMaster.initAndStartAppMaster(dagAppMaster, conf);
+          clientHandler = new DAGClientHandler(dagAppMaster);
 
         } catch (Throwable t) {
           LOG.error("Error starting DAGAppMaster", t);

http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
index 0f51eff..618676d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,9 +44,11 @@ public class DAGClientHandler {
   private Logger LOG = LoggerFactory.getLogger(DAGClientHandler.class);
 
   private DAGAppMaster dagAppMaster;
+  private final AtomicLong lastHeartbeatTime;
   
   public DAGClientHandler(DAGAppMaster dagAppMaster) {
     this.dagAppMaster = dagAppMaster;
+    this.lastHeartbeatTime = new AtomicLong(dagAppMaster.getContext().getClock().getTime());
   }
 
   private DAG getCurrentDAG() {
@@ -177,4 +180,12 @@ public class DAGClientHandler {
     return dag.getACLManager();
   }
 
+  public void updateLastHeartbeatTime() {
+    lastHeartbeatTime.set(dagAppMaster.getContext().getClock().getTime());
+  }
+
+  public long getLastHeartbeatTime() {
+    return lastHeartbeatTime.get();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
index 38f6740..14de870 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
@@ -75,6 +75,9 @@ public class DAGClientServer extends AbstractService {
 
       int numHandlers = conf.getInt(TezConfiguration.TEZ_AM_CLIENT_THREAD_COUNT,
                           TezConfiguration.TEZ_AM_CLIENT_THREAD_COUNT_DEFAULT);
+      if (numHandlers < 2) {
+        numHandlers = 2;
+      }
 
       server = createServer(DAGClientAMProtocolBlockingPB.class, addr, conf,
                             numHandlers, blockingService, TezConfiguration.TEZ_AM_CLIENT_AM_PORT_RANGE);

http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
index 32124b9..baac186 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
@@ -80,6 +80,7 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto
     if (!real.getACLManager().checkAMViewAccess(user)) {
       throw new AccessControlException("User " + user + " cannot perform AM view operation");
     }
+    real.updateLastHeartbeatTime();
     try{
       List<String> dagIds = real.getAllDAGs();
       return GetAllDAGsResponseProto.newBuilder().addAllDagId(dagIds).build();
@@ -98,6 +99,7 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto
       if (!real.getACLManager(dagId).checkDAGViewAccess(user)) {
         throw new AccessControlException("User " + user + " cannot perform DAG view operation");
       }
+      real.updateLastHeartbeatTime();
       DAGStatus status;
       status = real.getDAGStatus(dagId,
         DagTypeConverters.convertStatusGetOptsFromProto(
@@ -120,6 +122,7 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto
       if (!real.getACLManager(dagId).checkDAGViewAccess(user)) {
         throw new AccessControlException("User " + user + " cannot perform DAG view operation");
       }
+      real.updateLastHeartbeatTime();
       String vertexName = request.getVertexName();
       VertexStatus status = real.getVertexStatus(dagId, vertexName,
         DagTypeConverters.convertStatusGetOptsFromProto(
@@ -142,6 +145,7 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto
       if (!real.getACLManager(dagId).checkDAGModifyAccess(user)) {
         throw new AccessControlException("User " + user + " cannot perform DAG modify operation");
       }
+      real.updateLastHeartbeatTime();
       real.tryKillDAG(dagId);
       return TryKillDAGResponseProto.newBuilder().build();
     } catch (TezException e) {
@@ -156,6 +160,7 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto
     if (!real.getACLManager().checkAMModifyAccess(user)) {
       throw new AccessControlException("User " + user + " cannot perform AM modify operation");
     }
+    real.updateLastHeartbeatTime();
     try{
       if (request.hasSerializedRequestPath()) {
         // need to deserialize large request from hdfs
@@ -190,6 +195,7 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto
     if (!real.getACLManager().checkAMModifyAccess(user)) {
       throw new AccessControlException("User " + user + " cannot perform AM modify operation");
     }
+    real.updateLastHeartbeatTime();
     try {
       real.shutdownAM();
       return ShutdownSessionResponseProto.newBuilder().build();
@@ -205,6 +211,7 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto
     if (!real.getACLManager().checkAMViewAccess(user)) {
       throw new AccessControlException("User " + user + " cannot perform AM view operation");
     }
+    real.updateLastHeartbeatTime();
     try {
       TezAppMasterStatus sessionStatus = real.getTezAppMasterStatus();
       return GetAMStatusResponseProto.newBuilder().setStatus(

http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index de19fa3..062f29d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -49,6 +49,8 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
@@ -272,6 +274,11 @@ public class DAGAppMaster extends AbstractService {
 
   private boolean isLocal = false; //Local mode flag
 
+  // Timeout interval which if set will cause a running
+  // DAG to be killed and AM shutdown if the client has not
+  // pinged/heartbeated to the AM in the given time period.
+  private long clientAMHeartbeatTimeoutIntervalMillis = -1;
+
   @VisibleForTesting
   protected DAGAppMasterShutdownHandler shutdownHandler;
   private final AtomicBoolean shutdownHandlerRunning = new AtomicBoolean(false);
@@ -290,6 +297,7 @@ public class DAGAppMaster extends AbstractService {
   private long sessionTimeoutInterval;
   private long lastDAGCompletionTime;
   private Timer dagSubmissionTimer;
+  private ScheduledExecutorService clientAMHeartBeatTimeoutService;
   private boolean recoveryEnabled;
   private Path recoveryDataDir;
   private Path currentRecoveryDataDir;
@@ -599,6 +607,8 @@ public class DAGAppMaster extends AbstractService {
     this.sessionTimeoutInterval = 1000 * amConf.getInt(
             TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS,
             TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT);
+    this.clientAMHeartbeatTimeoutIntervalMillis =
+        TezCommonUtils.getAMClientHeartBeatTimeoutMillis(amConf);
 
     if (!versionMismatch) {
       if (isSession) {
@@ -2109,18 +2119,44 @@ public class DAGAppMaster extends AbstractService {
     }
 
     if (isSession) {
-      this.dagSubmissionTimer = new Timer(true);
+      this.dagSubmissionTimer = new Timer("DAGSubmissionTimer", true);
       this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() {
         @Override
         public void run() {
           try {
             checkAndHandleSessionTimeout();
           } catch (TezException e) {
-            LOG.error("Error when check AM session timeout", e);
+            LOG.error("Error when checking AM session timeout", e);
           }
         }
       }, sessionTimeoutInterval, sessionTimeoutInterval / 10);
     }
+
+    // Ignore client heartbeat timeout in local mode or non-session mode
+    if (!isLocal && isSession && clientAMHeartbeatTimeoutIntervalMillis > 0) {
+      // reset heartbeat time
+      clientHandler.updateLastHeartbeatTime();
+      this.clientAMHeartBeatTimeoutService = Executors.newSingleThreadScheduledExecutor(
+          new ThreadFactoryBuilder()
+              .setDaemon(true).setNameFormat("ClientAMHeartBeatKeepAliveCheck #%d").build()
+      );
+      this.clientAMHeartBeatTimeoutService.schedule(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            long nextExpiry = checkAndHandleDAGClientTimeout();
+            if (nextExpiry > 0) {
+              clientAMHeartBeatTimeoutService.schedule(this, nextExpiry, TimeUnit.MILLISECONDS);
+            }
+          } catch (TezException e) {
+            // Cannot be thrown unless the AM is being tried to shutdown so no need to
+            // reschedule the timer task
+            LOG.error("Error when checking Client AM heartbeat timeout", e);
+          }
+        }
+      }, clientAMHeartbeatTimeoutIntervalMillis, TimeUnit.MILLISECONDS);
+    }
+
   }
 
 
@@ -2137,6 +2173,9 @@ public class DAGAppMaster extends AbstractService {
       if (this.dagSubmissionTimer != null) {
         this.dagSubmissionTimer.cancel();
       }
+      if (this.clientAMHeartBeatTimeoutService != null) {
+        this.clientAMHeartBeatTimeoutService.shutdownNow();
+      }
       // release all the held containers before stop services TEZ-2687
       initiateStop();
       stopServices();
@@ -2273,6 +2312,33 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
+  private long checkAndHandleDAGClientTimeout() throws TezException {
+    if (EnumSet.of(DAGAppMasterState.NEW, DAGAppMasterState.RECOVERING).contains(this.state)
+        || sessionStopped.get()) {
+      // AM new or recovering so do not kill session at this time
+      // if session already completed or shutting down, this should be a a no-op
+      return -1;
+    }
+
+    long currentTime = clock.getTime();
+    long nextExpiry = clientHandler.getLastHeartbeatTime()
+        + clientAMHeartbeatTimeoutIntervalMillis;
+    if (currentTime < nextExpiry) {
+      // reschedule timer to 1 sec after the next expiry window
+      // to ensure that we time out as intended if there are no heartbeats
+      return ((nextExpiry+1000) - currentTime);
+    }
+
+    String message = "Client-to-AM Heartbeat timeout interval expired, shutting down AM as client"
+        + " stopped heartbeating to it"
+        + ", lastClientAMHeartbeatTime=" + clientHandler.getLastHeartbeatTime()
+        + ", clientAMHeartbeatTimeoutIntervalMillis="
+        + clientAMHeartbeatTimeoutIntervalMillis + " ms";
+    addDiagnostic(message);
+    shutdownTezAM(message);
+    return -1;
+  }
+
   private synchronized void checkAndHandleSessionTimeout() throws TezException {
     if (EnumSet.of(DAGAppMasterState.RUNNING,
         DAGAppMasterState.RECOVERING).contains(this.state)
@@ -2287,6 +2353,7 @@ public class DAGAppMaster extends AbstractService {
     String message = "Session timed out"
         + ", lastDAGCompletionTime=" + lastDAGCompletionTime + " ms"
         + ", sessionTimeoutInterval=" + sessionTimeoutInterval + " ms";
+    addDiagnostic(message);
     shutdownTezAM(message);
   }
 
@@ -2379,7 +2446,7 @@ public class DAGAppMaster extends AbstractService {
 
       // log the system properties
       if (LOG.isInfoEnabled()) {
-        String systemPropsToLog = TezUtils.getSystemPropertiesToLog(conf);
+        String systemPropsToLog = TezCommonUtils.getSystemPropertiesToLog(conf);
         if (systemPropsToLog != null) {
           LOG.info(systemPropsToLog);
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
index 8a8b776..bf07838 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.client.TezAppMasterStatus;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
@@ -62,6 +63,7 @@ public class TestDAGClientHandler {
     AppContext mockAppContext = mock(AppContext.class);
     when(mockDagAM.getContext()).thenReturn(mockAppContext);
     when(mockDagAM.getContext().getCurrentDAG()).thenReturn(mockDAG);
+    when(mockAppContext.getClock()).thenReturn(new SystemClock());
 
     DAGClientHandler dagClientHandler = new DAGClientHandler(mockDagAM);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 2255ed7..022fea3 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -54,7 +54,6 @@ import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezLocalResource;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.Limits;
 import org.apache.tez.common.security.JobTokenIdentifier;
@@ -66,11 +65,9 @@ import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.RelocalizationUtils;
 import org.apache.tez.hadoop.shim.HadoopShim;
-import org.apache.tez.hadoop.shim.HadoopShimProvider;
 import org.apache.tez.hadoop.shim.HadoopShimsLoader;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
-import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
 import org.apache.tez.runtime.internals.api.TaskReporterInterface;
 import org.slf4j.Logger;
@@ -505,7 +502,7 @@ public class TezChild {
 
     // log the system properties
     if (LOG.isInfoEnabled()) {
-      String systemPropsToLog = TezUtils.getSystemPropertiesToLog(defaultConf);
+      String systemPropsToLog = TezCommonUtils.getSystemPropertiesToLog(defaultConf);
       if (systemPropsToLog != null) {
         LOG.info(systemPropsToLog);
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index c3e8487..241c6e9 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -43,6 +43,8 @@ import java.util.concurrent.locks.ReentrantLock;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
@@ -1168,4 +1170,116 @@ public class TestTezJobs {
     assertEquals(0, expectedResult.size());
   }
 
+  @Test(timeout = 60000)
+  public void testAMClientHeartbeatTimeout() throws Exception {
+    Path stagingDirPath = new Path("/tmp/timeout-staging-dir");
+    remoteFs.mkdirs(stagingDirPath);
+
+    YarnClient yarnClient = YarnClient.createYarnClient();
+
+    try {
+
+      yarnClient.init(mrrTezCluster.getConfig());
+      yarnClient.start();
+
+      List<ApplicationReport> apps = yarnClient.getApplications();
+      int appsBeforeCount = apps != null ? apps.size() : 0;
+
+      TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+      tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
+      tezConf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS, 5);
+      TezClient tezClient = TezClient.create("testAMClientHeartbeatTimeout", tezConf, true);
+      tezClient.start();
+      tezClient.cancelAMKeepAlive();
+
+      ApplicationId appId = tezClient.getAppMasterApplicationId();
+
+      apps = yarnClient.getApplications();
+      int appsAfterCount = apps != null ? apps.size() : 0;
+
+      // Running in session mode. So should only create 1 more app.
+      Assert.assertEquals(appsBeforeCount + 1, appsAfterCount);
+
+      ApplicationReport report;
+      while (true) {
+        report = yarnClient.getApplicationReport(appId);
+        if (report.getYarnApplicationState() == YarnApplicationState.FINISHED
+            || report.getYarnApplicationState() == YarnApplicationState.FAILED
+            || report.getYarnApplicationState() == YarnApplicationState.KILLED) {
+          break;
+        }
+        Thread.sleep(1000);
+      }
+      // Add a sleep because YARN is not consistent in terms of reporting uptodate diagnostics
+      Thread.sleep(2000);
+      report = yarnClient.getApplicationReport(appId);
+      LOG.info("App Report for appId=" + appId
+          + ", report=" + report);
+      Assert.assertTrue("Actual diagnostics: " + report.getDiagnostics(),
+          report.getDiagnostics().contains("Client-to-AM Heartbeat timeout interval expired"));
+
+    } finally {
+      remoteFs.delete(stagingDirPath, true);
+      if (yarnClient != null) {
+        yarnClient.stop();
+      }
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testSessionTimeout() throws Exception {
+    Path stagingDirPath = new Path("/tmp/sessiontimeout-staging-dir");
+    remoteFs.mkdirs(stagingDirPath);
+
+    YarnClient yarnClient = YarnClient.createYarnClient();
+
+    try {
+
+      yarnClient.init(mrrTezCluster.getConfig());
+      yarnClient.start();
+
+      List<ApplicationReport> apps = yarnClient.getApplications();
+      int appsBeforeCount = apps != null ? apps.size() : 0;
+
+      TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+      tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
+      tezConf.setInt(TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS, 5);
+      TezClient tezClient = TezClient.create("testSessionTimeout", tezConf, true);
+      tezClient.start();
+
+      ApplicationId appId = tezClient.getAppMasterApplicationId();
+
+      apps = yarnClient.getApplications();
+      int appsAfterCount = apps != null ? apps.size() : 0;
+
+      // Running in session mode. So should only create 1 more app.
+      Assert.assertEquals(appsBeforeCount + 1, appsAfterCount);
+
+      ApplicationReport report;
+      while (true) {
+        report = yarnClient.getApplicationReport(appId);
+        if (report.getYarnApplicationState() == YarnApplicationState.FINISHED
+            || report.getYarnApplicationState() == YarnApplicationState.FAILED
+            || report.getYarnApplicationState() == YarnApplicationState.KILLED) {
+          break;
+        }
+        Thread.sleep(1000);
+      }
+      // Add a sleep because YARN is not consistent in terms of reporting uptodate diagnostics
+      Thread.sleep(2000);
+      report = yarnClient.getApplicationReport(appId);
+      LOG.info("App Report for appId=" + appId
+          + ", report=" + report);
+      Assert.assertTrue("Actual diagnostics: " + report.getDiagnostics(),
+          report.getDiagnostics().contains("Session timed out"));
+
+    } finally {
+      remoteFs.delete(stagingDirPath, true);
+      if (yarnClient != null) {
+        yarnClient.stop();
+      }
+    }
+  }
+
+
 }