You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2015/11/23 18:19:35 UTC
[5/7] incubator-slider git commit: SLIDER-930 Incorporate Yarn
feature of resetting AM failure count into Slider AM (Sherry Guo via
gourksaha)
SLIDER-930 Incorporate Yarn feature of resetting AM failure count into Slider AM (Sherry Guo via gourksaha)
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/1a3fb79d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/1a3fb79d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/1a3fb79d
Branch: refs/heads/feature/SLIDER-82-pass-3.1
Commit: 1a3fb79d5a78ad772f11b4a121b66c22ab1ba855
Parents: a0d4f93
Author: Gour Saha <go...@apache.org>
Authored: Mon Nov 16 20:27:08 2015 -0800
Committer: Gour Saha <go...@apache.org>
Committed: Mon Nov 16 20:27:08 2015 -0800
----------------------------------------------------------------------
.../org/apache/slider/api/ResourceKeys.java | 12 ++
.../apache/slider/core/conf/MapOperations.java | 15 +++
.../slider/core/launch/AbstractLauncher.java | 27 +++-
.../slider/core/launch/AppMasterLauncher.java | 2 +
.../standalone/TestStandaloneAMRestart.groovy | 122 +++++++++++++++++++
.../TestAppMasterLauncherWithAmReset.java | 92 ++++++++++++++
6 files changed, 269 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3fb79d/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java b/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java
index f481c6a..f92a58d 100644
--- a/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java
+++ b/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java
@@ -167,4 +167,16 @@ public interface ResourceKeys {
*/
String YARN_LOG_INCLUDE_PATTERNS = "yarn.log.include.patterns";
String YARN_LOG_EXCLUDE_PATTERNS = "yarn.log.exclude.patterns";
+
+ /**
+ * Window of time where application master's failure count
+ * can be reset to 0.
+ */
+ String YARN_RESOURCEMANAGER_AM_RETRY_COUNT_WINDOW_MS =
+ "yarn.resourcemanager.am.retry-count-window-ms";
+
+ /**
+ * The default window for Slider.
+ */
+ long DEFAULT_AM_RETRY_COUNT_WINDOW_MS = 300000;
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3fb79d/slider-core/src/main/java/org/apache/slider/core/conf/MapOperations.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/conf/MapOperations.java b/slider-core/src/main/java/org/apache/slider/core/conf/MapOperations.java
index 5f7b5f0..e58178c 100644
--- a/slider-core/src/main/java/org/apache/slider/core/conf/MapOperations.java
+++ b/slider-core/src/main/java/org/apache/slider/core/conf/MapOperations.java
@@ -138,6 +138,21 @@ public class MapOperations implements Map<String, String> {
String val = getOption(option, Integer.toString(defVal));
return Integer.decode(val);
}
+
+ /**
+ * Get a long option; use {@link Long#decode(String)} so as to take hex
+ * oct and bin values too.
+ *
+ * @param option option name
+ * @param defVal default value
+ * @return parsed value
+ * @throws NumberFormatException
+ */
+ public long getOptionLong(String option, long defVal) {
+ String val = getOption(option, Long.toString(defVal));
+ return Long.decode(val);
+ }
+
/**
* Get a mandatory integer option; use {@link Integer#decode(String)} so as to take hex
* oct and bin values too.
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3fb79d/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java b/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java
index 93aff08..22bf328 100644
--- a/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java
+++ b/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
@@ -279,6 +280,30 @@ public abstract class AbstractLauncher extends Configured {
}
}
+ /**
+ * Extract the value for option
+ * yarn.resourcemanager.am.retry-count-window-ms
+ * and set it on the ApplicationSubmissionContext. Use the default value
+ * if option is not set.
+ *
+ * @param submissionContext
+ * @param map
+ */
+ public void extractAmRetryCount(ApplicationSubmissionContext submissionContext,
+ Map<String, String> map) {
+
+ if (map != null) {
+ MapOperations options = new MapOperations("", map);
+ long amRetryCountWindow = options.getOptionLong(ResourceKeys
+ .YARN_RESOURCEMANAGER_AM_RETRY_COUNT_WINDOW_MS,
+ ResourceKeys.DEFAULT_AM_RETRY_COUNT_WINDOW_MS);
+ log.info("Setting {} to {}",
+ ResourceKeys.YARN_RESOURCEMANAGER_AM_RETRY_COUNT_WINDOW_MS,
+ amRetryCountWindow);
+ submissionContext.setAttemptFailuresValidityInterval(amRetryCountWindow);
+ }
+ }
+
public void extractLogAggregationContext(Map<String, String> map) {
if (map != null) {
String logPatternSepStr = "\\|";
@@ -423,7 +448,7 @@ public abstract class AbstractLauncher extends Configured {
}
/**
- * Suubmit an entire directory
+ * Submit an entire directory
* @param srcDir src path in filesystem
* @param destRelativeDir relative path under destination local dir
* @throws IOException IO problems
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3fb79d/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java b/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java
index 06dbfea..c82affa 100644
--- a/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java
+++ b/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java
@@ -106,6 +106,8 @@ public class AppMasterLauncher extends AbstractLauncher {
submissionContext.setApplicationTags(applicationTags);
}
submissionContext.setNodeLabelExpression(extractLabelExpression(options));
+
+ extractAmRetryCount(submissionContext, resourceGlobalOptions);
extractResourceRequirements(resource, options);
extractLogAggregationContext(resourceGlobalOptions);
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3fb79d/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy b/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy
index bdcf615..6947156 100644
--- a/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy
@@ -20,10 +20,13 @@ package org.apache.slider.agent.standalone
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
+
import org.apache.hadoop.yarn.api.records.ApplicationReport
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+import org.apache.hadoop.yarn.api.records.YarnApplicationState
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.slider.agent.AgentMiniClusterTestBase
+import org.apache.slider.api.ResourceKeys
import org.apache.slider.client.SliderClient
import org.apache.slider.common.SliderXmlConfKeys
import org.apache.slider.common.params.ActionAMSuicideArgs
@@ -94,6 +97,125 @@ class TestStandaloneAMRestart extends AgentMiniClusterTestBase {
assert 0 == clusterActionFreeze(sliderClient, clustername, "force", true)
}
+
+ @Test
+ public void testStandaloneAMRestartWithRetryWindow() throws Throwable {
+ describe "kill a Standalone AM and verify that the AM failure count " +
+ "is reset after the AM retry-count-window elapses"
+ // patch the configuration for AM restart
+ YarnConfiguration conf = getRestartableConfiguration(5)
+
+ int restartLimit = 3;
+ int amRetryWindow = 60000;
+ String amRetryWindowStr = amRetryWindow.toString()
+ String clustername = createMiniCluster("", conf, 1, true)
+ ServiceLauncher<SliderClient> launcher =
+ createStandaloneAMWithArgs(clustername,
+ [
+ Arguments.ARG_DEFINE,
+ SliderXmlConfKeys.KEY_AM_RESTART_LIMIT + "=" + restartLimit,
+ Arguments.ARG_RESOURCE_OPT,
+ ResourceKeys.YARN_RESOURCEMANAGER_AM_RETRY_COUNT_WINDOW_MS,
+ amRetryWindowStr
+ ],
+ true,
+ false)
+ SliderClient sliderClient = launcher.service
+ addToTeardown(sliderClient);
+
+ ApplicationReport report = waitForClusterLive(sliderClient)
+ logReport(report)
+ waitUntilClusterLive(sliderClient, 30000)
+
+ def diagnosticArgs = new ActionDiagnosticArgs()
+ diagnosticArgs.client = true
+ diagnosticArgs.yarn = true
+ sliderClient.actionDiagnostic(diagnosticArgs)
+
+ describe "kill AM #1"
+ int iteration = 1;
+ killAMAndWaitForRestart(sliderClient, iteration, clustername)
+
+ describe "kill AM #2"
+ killAMAndWaitForRestart(sliderClient, iteration++, clustername)
+
+ // app should be running here
+ assert 0 == sliderClient.actionExists(clustername, true)
+
+ // make sure the am reset window has elapsed
+ describe "sleeping to ensure reset window elapsed"
+ sleep (amRetryWindow)
+
+ // kill again & expect the app to still be running
+ describe "kill AM #3 after window elapsed"
+ killAMAndWaitForRestart(sliderClient, iteration++, clustername)
+ assert 0 == sliderClient.actionExists(clustername, true)
+
+ report = sliderClient.applicationReport
+ assert report.getYarnApplicationState() == YarnApplicationState.RUNNING
+
+ logReport(report)
+ describe("stopping the cluster")
+ assert 0 == clusterActionFreeze(sliderClient, clustername, "force", true)
+
+ report = sliderClient.applicationReport
+ assert report.finalApplicationStatus == FinalApplicationStatus.KILLED
+ }
+
+
+ @Test
+ public void testStandaloneAMRestartWithDefaultRetryWindow() throws Throwable {
+ describe "kill AM more than the max limit allowed within the AM " +
+ "retry-count-window and expect the app to fail"
+ // patch the configuration for AM restart
+ YarnConfiguration conf = getRestartableConfiguration(5)
+
+ int restartLimit = 3;
+ String clustername = createMiniCluster("", conf, 1, true)
+ ServiceLauncher<SliderClient> launcher =
+ createStandaloneAMWithArgs(clustername,
+ [
+ Arguments.ARG_DEFINE,
+ SliderXmlConfKeys.KEY_AM_RESTART_LIMIT + "=" + restartLimit,
+ ],
+ true,
+ false)
+ SliderClient sliderClient = launcher.service
+ addToTeardown(sliderClient);
+
+ ApplicationReport report = waitForClusterLive(sliderClient)
+ logReport(report)
+ waitUntilClusterLive(sliderClient, 30000)
+
+ def diagnosticArgs = new ActionDiagnosticArgs()
+ diagnosticArgs.client = true
+ diagnosticArgs.yarn = true
+ sliderClient.actionDiagnostic(diagnosticArgs)
+
+ describe "kill AM #1"
+ int iteration = 1;
+ killAMAndWaitForRestart(sliderClient, iteration, clustername)
+
+ describe "kill AM #2"
+ killAMAndWaitForRestart(sliderClient, iteration++, clustername)
+
+ // app should be running here
+ assert 0 == sliderClient.actionExists(clustername, true)
+
+ // kill again & expect the app to fail
+ describe "kill AM #3"
+ killAmAndWaitForDeath(sliderClient, iteration++, clustername)
+ sleep(40000)
+
+ report = sliderClient.applicationReport
+ assert report.finalApplicationStatus == FinalApplicationStatus.FAILED
+
+ logReport(report)
+ describe("stopping the cluster")
+ assert 0 == clusterActionFreeze(sliderClient, clustername, "force", true)
+ }
+
+
/**
* Kill an AM. take an iteration count for the message sent to the
* AM (hence its logs)
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3fb79d/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java b/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java
new file mode 100644
index 0000000..cc64cab
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.launch;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.slider.api.ResourceKeys;
+import org.apache.slider.client.SliderYarnClientImpl;
+import org.apache.slider.common.SliderKeys;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAppMasterLauncherWithAmReset {
+ SliderYarnClientImpl mockYarnClient;
+ YarnClientApplication yarnClientApp;
+ ApplicationSubmissionContext appSubmissionContext;
+ GetNewApplicationResponse newApp;
+ Set<String> tags = Collections.emptySet();
+ AppMasterLauncher appMasterLauncher = null;
+ boolean isOldApi = true;
+
+ @Before
+ public void initialize() throws Exception {
+ mockYarnClient = EasyMock.createNiceMock(SliderYarnClientImpl.class);
+ yarnClientApp = EasyMock.createNiceMock(YarnClientApplication.class);
+ newApp = EasyMock.createNiceMock(GetNewApplicationResponse.class);
+ EasyMock.expect(mockYarnClient.createApplication())
+ .andReturn(new YarnClientApplication(newApp,
+ Records.newRecord(ApplicationSubmissionContext.class)));
+ }
+
+ @Test
+ public void testExtractYarnResourceManagerAmRetryCountWindowMs() throws
+ Exception {
+ Map<String, String> options = new HashMap<String, String>();
+ final String expectedInterval = Integer.toString (120000);
+ options.put(ResourceKeys.YARN_RESOURCEMANAGER_AM_RETRY_COUNT_WINDOW_MS,
+ expectedInterval);
+ EasyMock.replay(mockYarnClient, yarnClientApp);
+
+ appMasterLauncher = new AppMasterLauncher("am1", SliderKeys.APP_TYPE, null,
+ null, mockYarnClient, false, null, options, tags);
+
+ ApplicationSubmissionContext ctx = appMasterLauncher.application
+ .getApplicationSubmissionContext();
+ String retryIntervalWindow = Long.toString(ctx
+ .getAttemptFailuresValidityInterval());
+ Assert.assertEquals(expectedInterval, retryIntervalWindow);
+ }
+
+ @Test
+ public void testExtractYarnResourceManagerAmRetryCountWindowMsDefaultValue()
+ throws Exception {
+ Map<String, String> options = new HashMap<String, String>();
+ EasyMock.replay(mockYarnClient, yarnClientApp);
+
+ appMasterLauncher = new AppMasterLauncher("am1", SliderKeys.APP_TYPE, null,
+ null, mockYarnClient, false, null, options, tags);
+
+ ApplicationSubmissionContext ctx = appMasterLauncher.application
+ .getApplicationSubmissionContext();
+ long retryIntervalWindow = ctx.getAttemptFailuresValidityInterval();
+ Assert.assertEquals(ResourceKeys.DEFAULT_AM_RETRY_COUNT_WINDOW_MS,
+ retryIntervalWindow);
+ }
+
+}