You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/10/21 22:37:19 UTC
tez git commit: TEZ-3405. Support ability for AM to kill itself if
there is no client heartbeating to it. (hitesh)
Repository: tez
Updated Branches:
refs/heads/master a1563eff7 -> 1cd62dedc
TEZ-3405. Support ability for AM to kill itself if there is no client heartbeating to it. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1cd62ded
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1cd62ded
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1cd62ded
Branch: refs/heads/master
Commit: 1cd62dedcc1fcf468a7c8a38f9c9ffbd0b28aa77
Parents: a1563ef
Author: Hitesh Shah <hi...@apache.org>
Authored: Fri Oct 21 15:33:03 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Fri Oct 21 15:33:03 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
tez-api/findbugs-exclude.xml | 10 ++
.../java/org/apache/tez/client/TezClient.java | 89 +++++++++++++--
.../org/apache/tez/common/TezCommonUtils.java | 58 ++++++++++
.../java/org/apache/tez/common/TezUtils.java | 21 ----
.../apache/tez/dag/api/TezConfiguration.java | 26 ++++-
.../org/apache/tez/dag/api/TezConstants.java | 12 ++
.../org/apache/tez/client/TestTezClient.java | 36 ++++++
.../apache/tez/common/TestTezCommonUtils.java | 71 ++++++++++++
.../org/apache/tez/common/TestTezUtils.java | 22 ----
tez-dag/findbugs-exclude.xml | 2 +
.../java/org/apache/tez/client/LocalClient.java | 2 +-
.../tez/dag/api/client/DAGClientHandler.java | 11 ++
.../tez/dag/api/client/DAGClientServer.java | 3 +
...DAGClientAMProtocolBlockingPBServerImpl.java | 7 ++
.../org/apache/tez/dag/app/DAGAppMaster.java | 73 +++++++++++-
.../dag/api/client/TestDAGClientHandler.java | 2 +
.../org/apache/tez/runtime/task/TezChild.java | 5 +-
.../java/org/apache/tez/test/TestTezJobs.java | 114 +++++++++++++++++++
19 files changed, 503 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dabc704..1afcacb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3405. Support ability for AM to kill itself if there is no client heartbeating to it.
TEZ-3483. Create basic travis yml file for Tez.
TEZ-3452. Auto-reduce parallelism calculation can overflow with large inputs
TEZ-3439. Tez joinvalidate fails when first input argument size is bigger than the second.
http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-api/findbugs-exclude.xml b/tez-api/findbugs-exclude.xml
index 95b9207..10d27f7 100644
--- a/tez-api/findbugs-exclude.xml
+++ b/tez-api/findbugs-exclude.xml
@@ -121,4 +121,14 @@
<Bug pattern="EI_EXPOSE_REP" />
</Match>
+ <Match>
+ <Class name="org.apache.tez.client.TezClient"/>
+ <Or>
+ <Field name="clientTimeout"/>
+ <Field name="frameworkClient"/>
+ <Field name="sessionAppId"/>
+ </Or>
+ <Bug pattern="IS2_INCONSISTENT_SYNC" />
+ </Match>
+
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 780fcb7..29e7a8b 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -22,6 +22,10 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.NumberFormat;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeUnit;
import java.util.HashMap;
@@ -81,6 +85,7 @@ import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ServiceException;
/**
@@ -118,8 +123,8 @@ public class TezClient {
private String diagnostics;
@VisibleForTesting
final boolean isSession;
- private boolean sessionStarted = false;
- private boolean sessionStopped = false;
+ private final AtomicBoolean sessionStarted = new AtomicBoolean(false);
+ private final AtomicBoolean sessionStopped = new AtomicBoolean(false);
/** Tokens which will be required for all DAGs submitted to this session. */
private Credentials sessionCredentials = new Credentials();
private long clientTimeout;
@@ -143,6 +148,8 @@ public class TezClient {
private AtomicInteger serializedSubmitDAGPlanRequestCounter = new AtomicInteger(0);
private FileSystem stagingFs = null;
+ private ScheduledExecutorService amKeepAliveService;
+
private TezClient(String name, TezConfiguration tezConf) {
this(name, tezConf, tezConf.getBoolean(
TezConfiguration.TEZ_AM_SESSION_MODE, TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT));
@@ -315,7 +322,7 @@ public class TezClient {
*/
public synchronized void addAppMasterLocalFiles(Map<String, LocalResource> localFiles) {
Preconditions.checkNotNull(localFiles);
- if (isSession && sessionStarted) {
+ if (isSession && sessionStarted.get()) {
additionalLocalResources.putAll(localFiles);
}
amConfig.addAMLocalResources(localFiles);
@@ -345,7 +352,7 @@ public class TezClient {
*/
public synchronized void setAppMasterCredentials(Credentials credentials) {
Preconditions
- .checkState(!sessionStarted,
+ .checkState(!sessionStarted.get(),
"Credentials cannot be set after the session App Master has been started");
amConfig.setCredentials(credentials);
}
@@ -433,15 +440,66 @@ public class TezClient {
frameworkClient.submitApplication(appContext);
ApplicationReport appReport = frameworkClient.getApplicationReport(sessionAppId);
LOG.info("The url to track the Tez Session: " + appReport.getTrackingUrl());
- sessionStarted = true;
+ sessionStarted.set(true);
} catch (YarnException e) {
throw new TezException(e);
}
+ long amClientKeepAliveTimeoutIntervalMillis =
+ TezCommonUtils.getAMClientHeartBeatTimeoutMillis(amConfig.getTezConfiguration());
+ // Poll at minimum of 1 second interval
+ long pollPeriod = TezCommonUtils.
+ getAMClientHeartBeatPollIntervalMillis(amConfig.getTezConfiguration(),
+ amClientKeepAliveTimeoutIntervalMillis, 10);
+
+ boolean isLocal = amConfig.getTezConfiguration().getBoolean(
+ TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
+ if (!isLocal && amClientKeepAliveTimeoutIntervalMillis > 0) {
+ amKeepAliveService = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setDaemon(true).setNameFormat("AMKeepAliveThread #%d").build());
+ amKeepAliveService.scheduleWithFixedDelay(new Runnable() {
+
+ private DAGClientAMProtocolBlockingPB proxy;
+
+ @Override
+ public void run() {
+ proxy = sendAMHeartbeat(proxy);
+ }
+ }, pollPeriod, pollPeriod, TimeUnit.MILLISECONDS);
+ }
+
this.stagingFs = FileSystem.get(amConfig.getTezConfiguration());
}
}
-
+
+ public DAGClientAMProtocolBlockingPB sendAMHeartbeat(DAGClientAMProtocolBlockingPB proxy) {
+ if (sessionStopped.get()) {
+ // Ignore sending heartbeat as session being stopped
+ return null;
+ }
+ try {
+ if (proxy == null) {
+ try {
+ proxy = waitForProxy();
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted while trying to create a connection to the AM", e);
+ }
+ }
+ if (proxy != null) {
+ LOG.debug("Sending heartbeat to AM");
+ proxy.getAMStatus(null, GetAMStatusRequestProto.newBuilder().build());
+ }
+ return proxy;
+ } catch (Exception e) {
+ LOG.info("Exception when sending heartbeat to AM for app {}: {}", sessionAppId,
+ e.getMessage());
+ LOG.debug("Error when sending heartbeat ping to AM. Resetting AM proxy for app: {}"
+ + " due to exception :", sessionAppId, e);
+ return null;
+ }
+ }
+
/**
* Submit a DAG. <br>In non-session mode, it submits a new App Master to the
* cluster.<br>In session mode, it submits the DAG to the session App Master. It
@@ -566,11 +624,14 @@ public class TezClient {
*/
public synchronized void stop() throws TezException, IOException {
try {
- if (sessionStarted) {
+ if (amKeepAliveService != null) {
+ amKeepAliveService.shutdownNow();
+ }
+ if (sessionStarted.get()) {
LOG.info("Shutting down Tez Session"
+ ", sessionName=" + clientName
+ ", applicationId=" + sessionAppId);
- sessionStopped = true;
+ sessionStopped.set(true);
boolean sessionShutdownSuccessful = false;
try {
DAGClientAMProtocolBlockingPB proxy = getAMProxy(sessionAppId);
@@ -914,9 +975,9 @@ public class TezClient {
private void verifySessionStateForSubmission() throws SessionNotRunning {
Preconditions.checkState(isSession, "Invalid without session mode");
- if (!sessionStarted) {
+ if (!sessionStarted.get()) {
throw new SessionNotRunning("Session not started");
- } else if (sessionStopped) {
+ } else if (sessionStopped.get()) {
throw new SessionNotRunning("Session stopped by user");
}
}
@@ -1031,6 +1092,14 @@ public class TezClient {
append(tezDagIdFormat.get().format(1)).toString();
}
+ @VisibleForTesting
+ @Private
+ public synchronized void cancelAMKeepAlive() {
+ if (amKeepAliveService != null) {
+ amKeepAliveService.shutdownNow();
+ }
+ }
+
/**
* A builder for setting up an instance of {@link org.apache.tez.client.TezClient}
*/
http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
index afdce39..69e48b2 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
@@ -487,4 +487,62 @@ public class TezCommonUtils {
jobToken.write(jobToken_dob);
return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength());
}
+
+ public static String getSystemPropertiesToLog(Configuration conf) {
+ Collection <String> keys = conf.getTrimmedStringCollection(
+ TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG);
+ if (keys.isEmpty()) {
+ keys = TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG_DEFAULT;
+ }
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n/************************************************************\n");
+ sb.append("[system properties]\n");
+ for (String key : keys) {
+ sb.append(key).append(": ").append(System.getProperty(key)).append('\n');
+ }
+ sb.append("************************************************************/");
+ return sb.toString();
+ }
+
+ /**
+ * Helper function to get the heartbeat interval for client-AM heartbeats
+ * See {@link TezConfiguration#TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS} for more details.
+ * @param conf Configuration object
+ * @return heartbeat interval in milliseconds. -1 implies disabled.
+ */
+ public static long getAMClientHeartBeatTimeoutMillis(Configuration conf) {
+ int val = conf.getInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS,
+ TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_DEFAULT);
+ if (val < 0) {
+ return -1;
+ }
+ if (val > 0 && val < TezConstants.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_MINIMUM) {
+ return TezConstants.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_MINIMUM * 1000;
+ }
+ return val * 1000;
+ }
+
+ /**
+ * Helper function to get the poll interval for client-AM heartbeats.
+ * @param conf Configuration object
+ * @param heartbeatIntervalMillis Heartbeat interval in milliseconds
+ * @param buckets How many times to poll within the provided heartbeat interval
+ * @return poll interval in milliseconds
+ */
+ public static long getAMClientHeartBeatPollIntervalMillis(Configuration conf,
+ long heartbeatIntervalMillis,
+ int buckets) {
+ if (heartbeatIntervalMillis <= 0) {
+ return -1;
+ }
+ int pollInterval = conf.getInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS,
+ TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS_DEFAULT);
+ if (pollInterval > 0) {
+ return Math.max(TezConstants.TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS_MINIMUM,
+ pollInterval);
+ }
+ return Math.max(TezConstants.TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS_MINIMUM,
+ heartbeatIntervalMillis/buckets);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
index 3aa1914..dfdf9fa 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
@@ -20,7 +20,6 @@ package org.apache.tez.common;
import java.io.IOException;
import java.io.OutputStream;
-import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -37,14 +36,12 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.client.TezClientUtils;
-import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.records.DAGProtos;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
-
/**
* Utility methods for setting up a DAG. Has helpers for setting up log4j configuration, converting
* {@link org.apache.hadoop.conf.Configuration} to {@link org.apache.tez.dag.api.UserPayload} etc.
@@ -68,23 +65,6 @@ public class TezUtils {
TezClientUtils.addLog4jSystemProperties(logLevel, vargs);
}
- public static String getSystemPropertiesToLog(Configuration conf) {
- Collection <String> keys = conf.getTrimmedStringCollection(
- TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG);
- if (keys.isEmpty()) {
- keys = TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG_DEFAULT;
- }
- StringBuilder sb = new StringBuilder();
- sb.append("\n/************************************************************\n");
- sb.append("[system properties]\n");
- for (String key : keys) {
- sb.append(key).append(": ").append(System.getProperty(key)).append('\n');
- }
- sb.append("************************************************************/");
- return sb.toString();
- }
-
-
/**
* Convert a Configuration to compressed ByteString using Protocol buffer
*
@@ -201,5 +181,4 @@ public class TezUtils {
return convertToHistoryText(null, conf);
}
-
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 77ea4ff..c4272b7 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -606,7 +606,7 @@ public class TezConfiguration extends Configuration {
@ConfigurationProperty(type="integer")
public static final String TEZ_AM_CLIENT_THREAD_COUNT =
TEZ_AM_PREFIX + "client.am.thread-count";
- public static final int TEZ_AM_CLIENT_THREAD_COUNT_DEFAULT = 1;
+ public static final int TEZ_AM_CLIENT_THREAD_COUNT_DEFAULT = 2;
/**
* String value. Range of ports that the AM can use when binding for client connections. Leave blank
@@ -1677,4 +1677,28 @@ public class TezConfiguration extends Configuration {
"java.vendor","java.version","java.vm.name","java.class.path",
"java.io.tmpdir","user.dir","user.name"));
+ /**
+ * Int value. Time interval (in seconds). If the Tez AM does not receive a heartbeat from the
+ * client within this time interval, it will kill any running DAG and shut down. Required to
+ * re-cycle orphaned Tez applications where the client is no longer alive. A negative value
+ * can be set to disable this check. For a positive value, the minimum value is 10 seconds.
+ * Values between 0 and 10 seconds will be reset to the minimum value.
+ * Only relevant in session mode.
+ * This is disabled by default i.e. by default, the Tez AM will go on to
+ * complete the DAG and only kill itself after hitting the DAG submission timeout defined by
+ * {@link #TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS}
+ */
+ @ConfigurationScope(Scope.AM)
+ @ConfigurationProperty(type="integer")
+ public static final String TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS =
+ TEZ_PREFIX + "am.client.heartbeat.timeout.secs";
+ public static final int TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_DEFAULT = -1;
+
+
+ @Private
+ @ConfigurationScope(Scope.AM)
+ public static final String TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS =
+ TEZ_PREFIX + "am.client.heartbeat.poll.interval.millis";
+ public static final int TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS_DEFAULT = -1;
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
index 6e1cb2d..06b9cb7 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
@@ -112,4 +112,16 @@ public class TezConstants {
public static String getTezUberServicePluginName() {
return TEZ_AM_SERVICE_PLUGIN_NAME_IN_AM;
}
+
+ /**
+ * Minimum heartbeat timeout value for the Client to AM heartbeat.
+ */
+ public static final int TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_MINIMUM = 10;
+
+ /**
+ * Minimum polling interval used for the client-AM heartbeat.
+ */
+ public static final long TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS_MINIMUM = 1000;
+
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 48dfff4..dbbd619 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -794,4 +794,40 @@ public class TestTezClient {
} catch (ApplicationNotFoundException e) {
}
}
+
+ @Test(timeout = 30000)
+ public void testAMClientHeartbeat() throws Exception {
+ TezConfiguration conf = new TezConfiguration();
+ conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS, 10);
+ final TezClientForTest client = configureAndCreateTezClient(conf);
+ client.start();
+ long start = System.currentTimeMillis();
+ while (true) {
+ if (System.currentTimeMillis() > (start + 5000)) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ client.stop();
+ verify(client.sessionAmProxy, atLeast(3)).getAMStatus(any(RpcController.class),
+ any(GetAMStatusRequestProto.class));
+
+ conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS, -1);
+ final TezClientForTest client2 = configureAndCreateTezClient(conf);
+ client2.start();
+ start = System.currentTimeMillis();
+ while (true) {
+ if (System.currentTimeMillis() > (start + 5000)) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ client2.stop();
+ verify(client2.sessionAmProxy, times(0)).getAMStatus(any(RpcController.class),
+ any(GetAMStatusRequestProto.class));
+
+
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java b/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
index a7e6069..5f0b33b 100644
--- a/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
+++ b/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
@@ -314,4 +314,75 @@ public class TestTezCommonUtils {
}
+ @Test (timeout = 5000)
+ public void testAMClientHeartBeatTimeout() {
+ TezConfiguration conf = new TezConfiguration(false);
+
+ // -1 for any negative value
+ Assert.assertEquals(-1,
+ TezCommonUtils.getAMClientHeartBeatTimeoutMillis(conf));
+ conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS, -2);
+ Assert.assertEquals(-1,
+ TezCommonUtils.getAMClientHeartBeatTimeoutMillis(conf));
+
+ // For any value > 0 but less than min, revert to min
+ conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS,
+ TezConstants.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_MINIMUM - 1);
+ Assert.assertEquals(TezConstants.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_MINIMUM * 1000,
+ TezCommonUtils.getAMClientHeartBeatTimeoutMillis(conf));
+
+ // For val > min, should remain val as configured
+ conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS,
+ TezConstants.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_MINIMUM * 2);
+ Assert.assertEquals(TezConstants.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_MINIMUM * 2000,
+ TezCommonUtils.getAMClientHeartBeatTimeoutMillis(conf));
+
+ conf = new TezConfiguration(false);
+ Assert.assertEquals(-1, TezCommonUtils.getAMClientHeartBeatPollIntervalMillis(conf, -1, 10));
+ Assert.assertEquals(-1, TezCommonUtils.getAMClientHeartBeatPollIntervalMillis(conf, -123, 10));
+ Assert.assertEquals(-1, TezCommonUtils.getAMClientHeartBeatPollIntervalMillis(conf, 0, 10));
+
+ // min poll interval is 1000
+ Assert.assertEquals(1000, TezCommonUtils.getAMClientHeartBeatPollIntervalMillis(conf, 600, 10));
+
+ // Poll interval is heartbeat interval/10
+ Assert.assertEquals(2000,
+ TezCommonUtils.getAMClientHeartBeatPollIntervalMillis(conf, 20000, 10));
+
+ // Configured poll interval ignored
+ conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS, -1);
+ Assert.assertEquals(4000,
+ TezCommonUtils.getAMClientHeartBeatPollIntervalMillis(conf, 20000, 5));
+
+ // Positive poll interval is allowed
+ conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_POLL_INTERVAL_MILLIS, 2000);
+ Assert.assertEquals(2000,
+ TezCommonUtils.getAMClientHeartBeatPollIntervalMillis(conf, 20000, 5));
+
+
+ }
+
+ @Test
+ public void testLogSystemProperties() throws Exception {
+ Configuration conf = new Configuration();
+ // test default logging
+ conf.set(TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG, " ");
+ String value = TezCommonUtils.getSystemPropertiesToLog(conf);
+ for(String key: TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG_DEFAULT) {
+ Assert.assertTrue(value.contains(key));
+ }
+
+ // test logging of selected keys
+ String classpath = "java.class.path";
+ String os = "os.name";
+ String version = "java.version";
+ conf.set(TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG, classpath + ", " + os);
+ value = TezCommonUtils.getSystemPropertiesToLog(conf);
+ Assert.assertNotNull(value);
+ Assert.assertTrue(value.contains(classpath));
+ Assert.assertTrue(value.contains(os));
+ Assert.assertFalse(value.contains(version));
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
index 2eab776..61bb9a7 100644
--- a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
+++ b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
@@ -230,26 +230,4 @@ public class TestTezUtils {
}
- @Test
- public void testLogSystemProperties() throws Exception {
- Configuration conf = new Configuration();
- // test default logging
- conf.set(TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG, " ");
- String value = TezUtils.getSystemPropertiesToLog(conf);
- for(String key: TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG_DEFAULT) {
- assertTrue(value.contains(key));
- }
-
- // test logging of selected keys
- String classpath = "java.class.path";
- String os = "os.name";
- String version = "java.version";
- conf.set(TezConfiguration.TEZ_JVM_SYSTEM_PROPERTIES_TO_LOG, classpath + ", " + os);
- value = TezUtils.getSystemPropertiesToLog(conf);
- assertNotNull(value);
- assertTrue(value.contains(classpath));
- assertTrue(value.contains(os));
- assertFalse(value.contains(version));
- }
-
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-dag/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
index e8adbb3..c3e099e 100644
--- a/tez-dag/findbugs-exclude.xml
+++ b/tez-dag/findbugs-exclude.xml
@@ -150,6 +150,8 @@
<Class name="org.apache.tez.dag.app.DAGAppMaster"/>
<Or>
<Field name="context"/>
+ <Field name="clientAMHeartbeatTimeoutIntervalMillis"/>
+ <Field name="clientHandler"/>
<Field name="currentDAG"/>
<Field name="state"/>
<Field name="taskSchedulerManager"/>
http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index 474f4ca..7c65c07 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -319,8 +319,8 @@ public class LocalClient extends FrameworkClient {
new SystemClock(), appSubmitTime, isSession, userDir.toUri().getPath(),
new String[] {localDir.toUri().getPath()}, new String[] {logDir.toUri().getPath()},
amCredentials, UserGroupInformation.getCurrentUser().getShortUserName());
- clientHandler = new DAGClientHandler(dagAppMaster);
DAGAppMaster.initAndStartAppMaster(dagAppMaster, conf);
+ clientHandler = new DAGClientHandler(dagAppMaster);
} catch (Throwable t) {
LOG.error("Error starting DAGAppMaster", t);
http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
index 0f51eff..618676d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,9 +44,11 @@ public class DAGClientHandler {
private Logger LOG = LoggerFactory.getLogger(DAGClientHandler.class);
private DAGAppMaster dagAppMaster;
+ private final AtomicLong lastHeartbeatTime;
public DAGClientHandler(DAGAppMaster dagAppMaster) {
this.dagAppMaster = dagAppMaster;
+ this.lastHeartbeatTime = new AtomicLong(dagAppMaster.getContext().getClock().getTime());
}
private DAG getCurrentDAG() {
@@ -177,4 +180,12 @@ public class DAGClientHandler {
return dag.getACLManager();
}
+ public void updateLastHeartbeatTime() {
+ lastHeartbeatTime.set(dagAppMaster.getContext().getClock().getTime());
+ }
+
+ public long getLastHeartbeatTime() {
+ return lastHeartbeatTime.get();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
index 38f6740..14de870 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java
@@ -75,6 +75,9 @@ public class DAGClientServer extends AbstractService {
int numHandlers = conf.getInt(TezConfiguration.TEZ_AM_CLIENT_THREAD_COUNT,
TezConfiguration.TEZ_AM_CLIENT_THREAD_COUNT_DEFAULT);
+ if (numHandlers < 2) {
+ numHandlers = 2;
+ }
server = createServer(DAGClientAMProtocolBlockingPB.class, addr, conf,
numHandlers, blockingService, TezConfiguration.TEZ_AM_CLIENT_AM_PORT_RANGE);
http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
index 32124b9..baac186 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
@@ -80,6 +80,7 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto
if (!real.getACLManager().checkAMViewAccess(user)) {
throw new AccessControlException("User " + user + " cannot perform AM view operation");
}
+ real.updateLastHeartbeatTime();
try{
List<String> dagIds = real.getAllDAGs();
return GetAllDAGsResponseProto.newBuilder().addAllDagId(dagIds).build();
@@ -98,6 +99,7 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto
if (!real.getACLManager(dagId).checkDAGViewAccess(user)) {
throw new AccessControlException("User " + user + " cannot perform DAG view operation");
}
+ real.updateLastHeartbeatTime();
DAGStatus status;
status = real.getDAGStatus(dagId,
DagTypeConverters.convertStatusGetOptsFromProto(
@@ -120,6 +122,7 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto
if (!real.getACLManager(dagId).checkDAGViewAccess(user)) {
throw new AccessControlException("User " + user + " cannot perform DAG view operation");
}
+ real.updateLastHeartbeatTime();
String vertexName = request.getVertexName();
VertexStatus status = real.getVertexStatus(dagId, vertexName,
DagTypeConverters.convertStatusGetOptsFromProto(
@@ -142,6 +145,7 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto
if (!real.getACLManager(dagId).checkDAGModifyAccess(user)) {
throw new AccessControlException("User " + user + " cannot perform DAG modify operation");
}
+ real.updateLastHeartbeatTime();
real.tryKillDAG(dagId);
return TryKillDAGResponseProto.newBuilder().build();
} catch (TezException e) {
@@ -156,6 +160,7 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto
if (!real.getACLManager().checkAMModifyAccess(user)) {
throw new AccessControlException("User " + user + " cannot perform AM modify operation");
}
+ real.updateLastHeartbeatTime();
try{
if (request.hasSerializedRequestPath()) {
// need to deserialize large request from hdfs
@@ -190,6 +195,7 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto
if (!real.getACLManager().checkAMModifyAccess(user)) {
throw new AccessControlException("User " + user + " cannot perform AM modify operation");
}
+ real.updateLastHeartbeatTime();
try {
real.shutdownAM();
return ShutdownSessionResponseProto.newBuilder().build();
@@ -205,6 +211,7 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto
if (!real.getACLManager().checkAMViewAccess(user)) {
throw new AccessControlException("User " + user + " cannot perform AM view operation");
}
+ real.updateLastHeartbeatTime();
try {
TezAppMasterStatus sessionStatus = real.getTezAppMasterStatus();
return GetAMStatusResponseProto.newBuilder().setStatus(
http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index de19fa3..062f29d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -49,6 +49,8 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
@@ -272,6 +274,11 @@ public class DAGAppMaster extends AbstractService {
private boolean isLocal = false; //Local mode flag
+ // Timeout interval which if set will cause a running
+ // DAG to be killed and AM shutdown if the client has not
+ // pinged/heartbeated to the AM in the given time period.
+ private long clientAMHeartbeatTimeoutIntervalMillis = -1;
+
@VisibleForTesting
protected DAGAppMasterShutdownHandler shutdownHandler;
private final AtomicBoolean shutdownHandlerRunning = new AtomicBoolean(false);
@@ -290,6 +297,7 @@ public class DAGAppMaster extends AbstractService {
private long sessionTimeoutInterval;
private long lastDAGCompletionTime;
private Timer dagSubmissionTimer;
+ private ScheduledExecutorService clientAMHeartBeatTimeoutService;
private boolean recoveryEnabled;
private Path recoveryDataDir;
private Path currentRecoveryDataDir;
@@ -599,6 +607,8 @@ public class DAGAppMaster extends AbstractService {
this.sessionTimeoutInterval = 1000 * amConf.getInt(
TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS,
TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT);
+ this.clientAMHeartbeatTimeoutIntervalMillis =
+ TezCommonUtils.getAMClientHeartBeatTimeoutMillis(amConf);
if (!versionMismatch) {
if (isSession) {
@@ -2109,18 +2119,44 @@ public class DAGAppMaster extends AbstractService {
}
if (isSession) {
- this.dagSubmissionTimer = new Timer(true);
+ this.dagSubmissionTimer = new Timer("DAGSubmissionTimer", true);
this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
checkAndHandleSessionTimeout();
} catch (TezException e) {
- LOG.error("Error when check AM session timeout", e);
+ LOG.error("Error when checking AM session timeout", e);
}
}
}, sessionTimeoutInterval, sessionTimeoutInterval / 10);
}
+
+ // Ignore client heartbeat timeout in local mode or non-session mode
+ if (!isLocal && isSession && clientAMHeartbeatTimeoutIntervalMillis > 0) {
+ // reset heartbeat time
+ clientHandler.updateLastHeartbeatTime();
+ this.clientAMHeartBeatTimeoutService = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setDaemon(true).setNameFormat("ClientAMHeartBeatKeepAliveCheck #%d").build()
+ );
+ this.clientAMHeartBeatTimeoutService.schedule(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ long nextExpiry = checkAndHandleDAGClientTimeout();
+ if (nextExpiry > 0) {
+ clientAMHeartBeatTimeoutService.schedule(this, nextExpiry, TimeUnit.MILLISECONDS);
+ }
+ } catch (TezException e) {
+ // Cannot be thrown unless the AM is being tried to shutdown so no need to
+ // reschedule the timer task
+ LOG.error("Error when checking Client AM heartbeat timeout", e);
+ }
+ }
+ }, clientAMHeartbeatTimeoutIntervalMillis, TimeUnit.MILLISECONDS);
+ }
+
}
@@ -2137,6 +2173,9 @@ public class DAGAppMaster extends AbstractService {
if (this.dagSubmissionTimer != null) {
this.dagSubmissionTimer.cancel();
}
+ if (this.clientAMHeartBeatTimeoutService != null) {
+ this.clientAMHeartBeatTimeoutService.shutdownNow();
+ }
// release all the held containers before stop services TEZ-2687
initiateStop();
stopServices();
@@ -2273,6 +2312,33 @@ public class DAGAppMaster extends AbstractService {
}
}
+ private long checkAndHandleDAGClientTimeout() throws TezException {
+ if (EnumSet.of(DAGAppMasterState.NEW, DAGAppMasterState.RECOVERING).contains(this.state)
+ || sessionStopped.get()) {
+ // AM new or recovering so do not kill session at this time
+ // if session already completed or shutting down, this should be a a no-op
+ return -1;
+ }
+
+ long currentTime = clock.getTime();
+ long nextExpiry = clientHandler.getLastHeartbeatTime()
+ + clientAMHeartbeatTimeoutIntervalMillis;
+ if (currentTime < nextExpiry) {
+ // reschedule timer to 1 sec after the next expiry window
+ // to ensure that we time out as intended if there are no heartbeats
+ return ((nextExpiry+1000) - currentTime);
+ }
+
+ String message = "Client-to-AM Heartbeat timeout interval expired, shutting down AM as client"
+ + " stopped heartbeating to it"
+ + ", lastClientAMHeartbeatTime=" + clientHandler.getLastHeartbeatTime()
+ + ", clientAMHeartbeatTimeoutIntervalMillis="
+ + clientAMHeartbeatTimeoutIntervalMillis + " ms";
+ addDiagnostic(message);
+ shutdownTezAM(message);
+ return -1;
+ }
+
private synchronized void checkAndHandleSessionTimeout() throws TezException {
if (EnumSet.of(DAGAppMasterState.RUNNING,
DAGAppMasterState.RECOVERING).contains(this.state)
@@ -2287,6 +2353,7 @@ public class DAGAppMaster extends AbstractService {
String message = "Session timed out"
+ ", lastDAGCompletionTime=" + lastDAGCompletionTime + " ms"
+ ", sessionTimeoutInterval=" + sessionTimeoutInterval + " ms";
+ addDiagnostic(message);
shutdownTezAM(message);
}
@@ -2379,7 +2446,7 @@ public class DAGAppMaster extends AbstractService {
// log the system properties
if (LOG.isInfoEnabled()) {
- String systemPropsToLog = TezUtils.getSystemPropertiesToLog(conf);
+ String systemPropsToLog = TezCommonUtils.getSystemPropertiesToLog(conf);
if (systemPropsToLog != null) {
LOG.info(systemPropsToLog);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
index 8a8b776..bf07838 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.client.TezAppMasterStatus;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
@@ -62,6 +63,7 @@ public class TestDAGClientHandler {
AppContext mockAppContext = mock(AppContext.class);
when(mockDagAM.getContext()).thenReturn(mockAppContext);
when(mockDagAM.getContext().getCurrentDAG()).thenReturn(mockDAG);
+ when(mockAppContext.getClock()).thenReturn(new SystemClock());
DAGClientHandler dagClientHandler = new DAGClientHandler(mockDagAM);
http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 2255ed7..022fea3 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -54,7 +54,6 @@ import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezLocalResource;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.common.TezUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.common.security.JobTokenIdentifier;
@@ -66,11 +65,9 @@ import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.RelocalizationUtils;
import org.apache.tez.hadoop.shim.HadoopShim;
-import org.apache.tez.hadoop.shim.HadoopShimProvider;
import org.apache.tez.hadoop.shim.HadoopShimsLoader;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
-import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
import org.apache.tez.runtime.internals.api.TaskReporterInterface;
import org.slf4j.Logger;
@@ -505,7 +502,7 @@ public class TezChild {
// log the system properties
if (LOG.isInfoEnabled()) {
- String systemPropsToLog = TezUtils.getSystemPropertiesToLog(defaultConf);
+ String systemPropsToLog = TezCommonUtils.getSystemPropertiesToLog(defaultConf);
if (systemPropsToLog != null) {
LOG.info(systemPropsToLog);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1cd62ded/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index c3e8487..241c6e9 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -43,6 +43,8 @@ import java.util.concurrent.locks.ReentrantLock;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
@@ -1168,4 +1170,116 @@ public class TestTezJobs {
assertEquals(0, expectedResult.size());
}
+ @Test(timeout = 60000)
+ public void testAMClientHeartbeatTimeout() throws Exception {
+ Path stagingDirPath = new Path("/tmp/timeout-staging-dir");
+ remoteFs.mkdirs(stagingDirPath);
+
+ YarnClient yarnClient = YarnClient.createYarnClient();
+
+ try {
+
+ yarnClient.init(mrrTezCluster.getConfig());
+ yarnClient.start();
+
+ List<ApplicationReport> apps = yarnClient.getApplications();
+ int appsBeforeCount = apps != null ? apps.size() : 0;
+
+ TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
+ tezConf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS, 5);
+ TezClient tezClient = TezClient.create("testAMClientHeartbeatTimeout", tezConf, true);
+ tezClient.start();
+ tezClient.cancelAMKeepAlive();
+
+ ApplicationId appId = tezClient.getAppMasterApplicationId();
+
+ apps = yarnClient.getApplications();
+ int appsAfterCount = apps != null ? apps.size() : 0;
+
+ // Running in session mode. So should only create 1 more app.
+ Assert.assertEquals(appsBeforeCount + 1, appsAfterCount);
+
+ ApplicationReport report;
+ while (true) {
+ report = yarnClient.getApplicationReport(appId);
+ if (report.getYarnApplicationState() == YarnApplicationState.FINISHED
+ || report.getYarnApplicationState() == YarnApplicationState.FAILED
+ || report.getYarnApplicationState() == YarnApplicationState.KILLED) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ // Add a sleep because YARN is not consistent in terms of reporting uptodate diagnostics
+ Thread.sleep(2000);
+ report = yarnClient.getApplicationReport(appId);
+ LOG.info("App Report for appId=" + appId
+ + ", report=" + report);
+ Assert.assertTrue("Actual diagnostics: " + report.getDiagnostics(),
+ report.getDiagnostics().contains("Client-to-AM Heartbeat timeout interval expired"));
+
+ } finally {
+ remoteFs.delete(stagingDirPath, true);
+ if (yarnClient != null) {
+ yarnClient.stop();
+ }
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testSessionTimeout() throws Exception {
+ Path stagingDirPath = new Path("/tmp/sessiontimeout-staging-dir");
+ remoteFs.mkdirs(stagingDirPath);
+
+ YarnClient yarnClient = YarnClient.createYarnClient();
+
+ try {
+
+ yarnClient.init(mrrTezCluster.getConfig());
+ yarnClient.start();
+
+ List<ApplicationReport> apps = yarnClient.getApplications();
+ int appsBeforeCount = apps != null ? apps.size() : 0;
+
+ TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
+ tezConf.setInt(TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS, 5);
+ TezClient tezClient = TezClient.create("testSessionTimeout", tezConf, true);
+ tezClient.start();
+
+ ApplicationId appId = tezClient.getAppMasterApplicationId();
+
+ apps = yarnClient.getApplications();
+ int appsAfterCount = apps != null ? apps.size() : 0;
+
+ // Running in session mode. So should only create 1 more app.
+ Assert.assertEquals(appsBeforeCount + 1, appsAfterCount);
+
+ ApplicationReport report;
+ while (true) {
+ report = yarnClient.getApplicationReport(appId);
+ if (report.getYarnApplicationState() == YarnApplicationState.FINISHED
+ || report.getYarnApplicationState() == YarnApplicationState.FAILED
+ || report.getYarnApplicationState() == YarnApplicationState.KILLED) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ // Add a sleep because YARN is not consistent in terms of reporting uptodate diagnostics
+ Thread.sleep(2000);
+ report = yarnClient.getApplicationReport(appId);
+ LOG.info("App Report for appId=" + appId
+ + ", report=" + report);
+ Assert.assertTrue("Actual diagnostics: " + report.getDiagnostics(),
+ report.getDiagnostics().contains("Session timed out"));
+
+ } finally {
+ remoteFs.delete(stagingDirPath, true);
+ if (yarnClient != null) {
+ yarnClient.stop();
+ }
+ }
+ }
+
+
}