You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by go...@apache.org on 2015/11/17 05:28:01 UTC

incubator-slider git commit: SLIDER-930 Incorporate Yarn feature of resetting AM failure count into Slider AM (Sherry Guo via gourksaha)

Repository: incubator-slider
Updated Branches:
  refs/heads/develop a0d4f93d8 -> 1a3fb79d5


SLIDER-930 Incorporate Yarn feature of resetting AM failure count into Slider AM (Sherry Guo via gourksaha)


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/1a3fb79d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/1a3fb79d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/1a3fb79d

Branch: refs/heads/develop
Commit: 1a3fb79d5a78ad772f11b4a121b66c22ab1ba855
Parents: a0d4f93
Author: Gour Saha <go...@apache.org>
Authored: Mon Nov 16 20:27:08 2015 -0800
Committer: Gour Saha <go...@apache.org>
Committed: Mon Nov 16 20:27:08 2015 -0800

----------------------------------------------------------------------
 .../org/apache/slider/api/ResourceKeys.java     |  12 ++
 .../apache/slider/core/conf/MapOperations.java  |  15 +++
 .../slider/core/launch/AbstractLauncher.java    |  27 +++-
 .../slider/core/launch/AppMasterLauncher.java   |   2 +
 .../standalone/TestStandaloneAMRestart.groovy   | 122 +++++++++++++++++++
 .../TestAppMasterLauncherWithAmReset.java       |  92 ++++++++++++++
 6 files changed, 269 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3fb79d/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java b/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java
index f481c6a..f92a58d 100644
--- a/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java
+++ b/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java
@@ -167,4 +167,16 @@ public interface ResourceKeys {
    */
   String YARN_LOG_INCLUDE_PATTERNS = "yarn.log.include.patterns";
   String YARN_LOG_EXCLUDE_PATTERNS = "yarn.log.exclude.patterns";
+
+  /**
+   * Window of time where application master's failure count
+   * can be reset to 0.
+   */
+  String YARN_RESOURCEMANAGER_AM_RETRY_COUNT_WINDOW_MS  =
+      "yarn.resourcemanager.am.retry-count-window-ms";
+
+  /**
+   * The default window for Slider.
+   */
+  long DEFAULT_AM_RETRY_COUNT_WINDOW_MS = 300000;
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3fb79d/slider-core/src/main/java/org/apache/slider/core/conf/MapOperations.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/conf/MapOperations.java b/slider-core/src/main/java/org/apache/slider/core/conf/MapOperations.java
index 5f7b5f0..e58178c 100644
--- a/slider-core/src/main/java/org/apache/slider/core/conf/MapOperations.java
+++ b/slider-core/src/main/java/org/apache/slider/core/conf/MapOperations.java
@@ -138,6 +138,21 @@ public class MapOperations implements Map<String, String> {
     String val = getOption(option, Integer.toString(defVal));
     return Integer.decode(val);
   }
+
+  /**
+   * Get a long option; use {@link Long#decode(String)} so as to take hex
+   * oct and bin values too.
+   *
+   * @param option option name
+   * @param defVal default value
+   * @return parsed value
+   * @throws NumberFormatException
+   */
+  public long getOptionLong(String option, long defVal) {
+    String val = getOption(option, Long.toString(defVal));
+    return Long.decode(val);
+  }
+
   /**
    * Get a mandatory integer option; use {@link Integer#decode(String)} so as to take hex
    * oct and bin values too.

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3fb79d/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java b/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java
index 93aff08..22bf328 100644
--- a/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java
+++ b/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
@@ -279,6 +280,30 @@ public abstract class AbstractLauncher extends Configured {
     }
   }
 
+  /**
+   * Extract the value for option
+   * yarn.resourcemanager.am.retry-count-window-ms
+   * and set it on the ApplicationSubmissionContext. Use the default value
+   * if option is not set.
+   *
+   * @param submissionContext
+   * @param map
+   */
+  public void extractAmRetryCount(ApplicationSubmissionContext submissionContext,
+                                  Map<String, String> map) {
+
+    if (map != null) {
+      MapOperations options = new MapOperations("", map);
+      long amRetryCountWindow = options.getOptionLong(ResourceKeys
+          .YARN_RESOURCEMANAGER_AM_RETRY_COUNT_WINDOW_MS,
+          ResourceKeys.DEFAULT_AM_RETRY_COUNT_WINDOW_MS);
+      log.info("Setting {} to {}",
+          ResourceKeys.YARN_RESOURCEMANAGER_AM_RETRY_COUNT_WINDOW_MS,
+          amRetryCountWindow);
+      submissionContext.setAttemptFailuresValidityInterval(amRetryCountWindow);
+    }
+  }
+
   public void extractLogAggregationContext(Map<String, String> map) {
     if (map != null) {
       String logPatternSepStr = "\\|";
@@ -423,7 +448,7 @@ public abstract class AbstractLauncher extends Configured {
   }
 
   /**
-   * Suubmit an entire directory
+   * Submit an entire directory
    * @param srcDir src path in filesystem
    * @param destRelativeDir relative path under destination local dir
    * @throws IOException IO problems

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3fb79d/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java b/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java
index 06dbfea..c82affa 100644
--- a/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java
+++ b/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java
@@ -106,6 +106,8 @@ public class AppMasterLauncher extends AbstractLauncher {
       submissionContext.setApplicationTags(applicationTags);
     }
     submissionContext.setNodeLabelExpression(extractLabelExpression(options));
+
+    extractAmRetryCount(submissionContext, resourceGlobalOptions);
     extractResourceRequirements(resource, options);
     extractLogAggregationContext(resourceGlobalOptions);
   }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3fb79d/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy b/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy
index bdcf615..6947156 100644
--- a/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy
@@ -20,10 +20,13 @@ package org.apache.slider.agent.standalone
 
 import groovy.transform.CompileStatic
 import groovy.util.logging.Slf4j
+
 import org.apache.hadoop.yarn.api.records.ApplicationReport
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+import org.apache.hadoop.yarn.api.records.YarnApplicationState
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.slider.agent.AgentMiniClusterTestBase
+import org.apache.slider.api.ResourceKeys
 import org.apache.slider.client.SliderClient
 import org.apache.slider.common.SliderXmlConfKeys
 import org.apache.slider.common.params.ActionAMSuicideArgs
@@ -94,6 +97,125 @@ class TestStandaloneAMRestart extends AgentMiniClusterTestBase {
     assert 0 == clusterActionFreeze(sliderClient, clustername, "force", true)
   }
 
+
+  @Test
+  public void testStandaloneAMRestartWithRetryWindow() throws Throwable {
+    describe "kill a Standalone AM and verify that the AM failure count " +
+        "is reset after the AM retry-count-window elapses"
+    // patch the configuration for AM restart
+    YarnConfiguration conf = getRestartableConfiguration(5)
+
+    int restartLimit = 3;
+    int amRetryWindow = 60000;
+    String amRetryWindowStr = amRetryWindow.toString()
+    String clustername = createMiniCluster("", conf, 1, true)
+    ServiceLauncher<SliderClient> launcher =
+        createStandaloneAMWithArgs(clustername,
+            [
+                Arguments.ARG_DEFINE,
+                SliderXmlConfKeys.KEY_AM_RESTART_LIMIT + "=" + restartLimit,
+                Arguments.ARG_RESOURCE_OPT,
+                ResourceKeys.YARN_RESOURCEMANAGER_AM_RETRY_COUNT_WINDOW_MS,
+                amRetryWindowStr
+            ],
+            true,
+            false)
+    SliderClient sliderClient = launcher.service
+    addToTeardown(sliderClient);
+
+    ApplicationReport report = waitForClusterLive(sliderClient)
+    logReport(report)
+    waitUntilClusterLive(sliderClient, 30000)
+
+    def diagnosticArgs = new ActionDiagnosticArgs()
+    diagnosticArgs.client = true
+    diagnosticArgs.yarn = true
+    sliderClient.actionDiagnostic(diagnosticArgs)
+
+    describe "kill AM #1"
+    int iteration = 1;
+    killAMAndWaitForRestart(sliderClient, iteration, clustername)
+
+    describe "kill AM #2"
+    killAMAndWaitForRestart(sliderClient, iteration++, clustername)
+
+    // app should be running here
+    assert 0 == sliderClient.actionExists(clustername, true)
+
+    // make sure the am reset window has elapsed
+    describe "sleeping to ensure reset window elapsed"
+    sleep (amRetryWindow)
+
+    // kill again & expect the app to still be running
+    describe "kill AM #3 after window elapsed"
+    killAMAndWaitForRestart(sliderClient, iteration++, clustername)
+    assert 0 == sliderClient.actionExists(clustername, true)
+
+    report = sliderClient.applicationReport
+    assert report.getYarnApplicationState() == YarnApplicationState.RUNNING
+
+    logReport(report)
+    describe("stopping the cluster")
+    assert 0 == clusterActionFreeze(sliderClient, clustername, "force", true)
+
+    report = sliderClient.applicationReport
+    assert report.finalApplicationStatus == FinalApplicationStatus.KILLED
+  }
+
+
+  @Test
+  public void testStandaloneAMRestartWithDefaultRetryWindow() throws Throwable {
+    describe "kill AM more than the max limit allowed within the AM " +
+        "retry-count-window and expect the app to fail"
+    // patch the configuration for AM restart
+    YarnConfiguration conf = getRestartableConfiguration(5)
+
+    int restartLimit = 3;
+    String clustername = createMiniCluster("", conf, 1, true)
+    ServiceLauncher<SliderClient> launcher =
+        createStandaloneAMWithArgs(clustername,
+            [
+                Arguments.ARG_DEFINE,
+                SliderXmlConfKeys.KEY_AM_RESTART_LIMIT + "=" + restartLimit,
+            ],
+            true,
+            false)
+    SliderClient sliderClient = launcher.service
+    addToTeardown(sliderClient);
+
+    ApplicationReport report = waitForClusterLive(sliderClient)
+    logReport(report)
+    waitUntilClusterLive(sliderClient, 30000)
+
+    def diagnosticArgs = new ActionDiagnosticArgs()
+    diagnosticArgs.client = true
+    diagnosticArgs.yarn = true
+    sliderClient.actionDiagnostic(diagnosticArgs)
+
+    describe "kill AM #1"
+    int iteration = 1;
+    killAMAndWaitForRestart(sliderClient, iteration, clustername)
+
+    describe "kill AM #2"
+    killAMAndWaitForRestart(sliderClient, iteration++, clustername)
+
+    // app should be running here
+    assert 0 == sliderClient.actionExists(clustername, true)
+
+    // kill again & expect the app to fail
+    describe "kill AM #3"
+    killAmAndWaitForDeath(sliderClient, iteration++, clustername)
+    sleep(40000)
+
+    report = sliderClient.applicationReport
+    assert report.finalApplicationStatus == FinalApplicationStatus.FAILED
+
+    logReport(report)
+    describe("stopping the cluster")
+    assert 0 == clusterActionFreeze(sliderClient, clustername, "force", true)
+  }
+
+
   /**
    * Kill an AM. take an iteration count for the message sent to the 
    * AM (hence its logs)

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3fb79d/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java b/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java
new file mode 100644
index 0000000..cc64cab
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.launch;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.slider.api.ResourceKeys;
+import org.apache.slider.client.SliderYarnClientImpl;
+import org.apache.slider.common.SliderKeys;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAppMasterLauncherWithAmReset {
+  SliderYarnClientImpl mockYarnClient;
+  YarnClientApplication yarnClientApp;
+  ApplicationSubmissionContext appSubmissionContext;
+  GetNewApplicationResponse newApp;
+  Set<String> tags = Collections.emptySet();
+  AppMasterLauncher appMasterLauncher = null;
+  boolean isOldApi = true;
+
+  @Before
+  public void initialize() throws Exception {
+    mockYarnClient = EasyMock.createNiceMock(SliderYarnClientImpl.class);
+    yarnClientApp = EasyMock.createNiceMock(YarnClientApplication.class);
+    newApp = EasyMock.createNiceMock(GetNewApplicationResponse.class);
+    EasyMock.expect(mockYarnClient.createApplication())
+        .andReturn(new YarnClientApplication(newApp,
+        Records.newRecord(ApplicationSubmissionContext.class)));
+  }
+
+  @Test
+  public void testExtractYarnResourceManagerAmRetryCountWindowMs() throws
+      Exception {
+    Map<String, String> options = new HashMap<String, String>();
+    final String expectedInterval = Integer.toString (120000);
+    options.put(ResourceKeys.YARN_RESOURCEMANAGER_AM_RETRY_COUNT_WINDOW_MS,
+        expectedInterval);
+    EasyMock.replay(mockYarnClient, yarnClientApp);
+
+    appMasterLauncher = new AppMasterLauncher("am1", SliderKeys.APP_TYPE, null,
+        null, mockYarnClient, false, null, options, tags);
+
+    ApplicationSubmissionContext ctx = appMasterLauncher.application
+        .getApplicationSubmissionContext();
+    String retryIntervalWindow = Long.toString(ctx
+        .getAttemptFailuresValidityInterval());
+    Assert.assertEquals(expectedInterval, retryIntervalWindow);
+  }
+
+  @Test
+  public void testExtractYarnResourceManagerAmRetryCountWindowMsDefaultValue()
+      throws Exception {
+    Map<String, String> options = new HashMap<String, String>();
+    EasyMock.replay(mockYarnClient, yarnClientApp);
+
+    appMasterLauncher = new AppMasterLauncher("am1", SliderKeys.APP_TYPE, null,
+        null, mockYarnClient, false, null, options, tags);
+
+    ApplicationSubmissionContext ctx = appMasterLauncher.application
+        .getApplicationSubmissionContext();
+    long retryIntervalWindow = ctx.getAttemptFailuresValidityInterval();
+    Assert.assertEquals(ResourceKeys.DEFAULT_AM_RETRY_COUNT_WINDOW_MS,
+        retryIntervalWindow);
+  }
+
+}