You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2014/06/30 17:36:52 UTC
[03/50] [abbrv] git commit: SLIDER-126. Slider-Agent and Agent
Provider need to handle component instance install/start failure gracefully
SLIDER-126. Slider-Agent and Agent Provider need to handle component instance install/start failure gracefully
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/3aca57d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/3aca57d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/3aca57d2
Branch: refs/heads/feature/SLIDER-151_Implement_full_slider_API_in_REST_and_switch_client_to_it
Commit: 3aca57d2c0d8e06fc720b023583d5a7ce1f4273e
Parents: 7003c06
Author: Sumit Mohanty <sm...@hortonworks.com>
Authored: Sat Jun 21 18:09:42 2014 -0700
Committer: Sumit Mohanty <sm...@hortonworks.com>
Committed: Sat Jun 21 18:09:42 2014 -0700
----------------------------------------------------------------------
slider-agent/conf/agent.ini | 2 +
.../src/main/python/agent/AgentConfig.py | 21 +++
slider-agent/src/main/python/agent/Constants.py | 3 +
.../src/main/python/agent/Controller.py | 23 ++-
slider-agent/src/main/python/agent/main.py | 4 +
.../src/test/python/agent/TestController.py | 30 ++++
slider-agent/src/test/python/agent/TestMain.py | 5 +-
.../providers/AbstractProviderService.java | 15 +-
.../slider/providers/ProviderService.java | 5 +-
.../slider/providers/agent/AgentKeys.java | 3 +
.../providers/agent/AgentLaunchParameter.java | 130 ++++++++++++++
.../providers/agent/AgentProviderService.java | 179 ++++++++++++++++---
.../slider/providers/agent/AgentRoles.java | 18 +-
.../apache/slider/providers/agent/Command.java | 13 +-
.../slider/providers/agent/CommandResult.java | 16 +-
.../providers/agent/ComponentInstanceState.java | 24 +++
.../slider/providers/agent/ContainerState.java | 41 +++++
.../providers/agent/HeartbeatMonitor.java | 116 ++++++++++++
.../server/appmaster/AMViewForProviders.java | 27 +++
.../server/appmaster/SliderAppMaster.java | 32 +++-
.../slider/server/appmaster/state/AppState.java | 20 +++
.../test_command_log/appConfig_fast_no_reg.json | 29 +++
.../test_command_log/appConfig_no_hb.json | 29 +++
.../model/mock/MockProviderService.groovy | 4 +-
.../agent/TestAgentLaunchParameter.java | 76 ++++++++
.../providers/agent/TestHeartbeatMonitor.java | 136 ++++++++++++++
.../publisher/TestAgentProviderService.java | 5 +-
.../lifecycle/AgentCommandTestBase.groovy | 100 ++++++++---
.../lifecycle/TestAgentClusterLifecycle.groovy | 2 +-
.../funtest/lifecycle/TestAgentFailures.groovy | 103 +++++++++++
.../funtest/lifecycle/TestAgentFailures2.groovy | 103 +++++++++++
.../lifecycle/TestAppsThroughAgent.groovy | 90 ++++------
32 files changed, 1253 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-agent/conf/agent.ini
----------------------------------------------------------------------
diff --git a/slider-agent/conf/agent.ini b/slider-agent/conf/agent.ini
index 87d73a7..b52bec9 100644
--- a/slider-agent/conf/agent.ini
+++ b/slider-agent/conf/agent.ini
@@ -25,6 +25,8 @@ heartbeat_path=/ws/v1/slider/agents/{name}/heartbeat
app_pkg_dir=app/definition
app_install_dir=app/install
app_run_dir=app/run
+app_dbg_cmd=
+debug_mode_enabled=true
app_task_dir=app/command-log
app_log_dir=app/log
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-agent/src/main/python/agent/AgentConfig.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/AgentConfig.py b/slider-agent/src/main/python/agent/AgentConfig.py
index e0981f6..91adfdd 100644
--- a/slider-agent/src/main/python/agent/AgentConfig.py
+++ b/slider-agent/src/main/python/agent/AgentConfig.py
@@ -21,6 +21,9 @@ limitations under the License.
import ConfigParser
import StringIO
import os
+import logging
+
+logger = logging.getLogger()
config = ConfigParser.RawConfigParser()
content = """
@@ -37,6 +40,8 @@ heartbeat_path=/ws/v1/slider/agents/{name}/heartbeat
app_pkg_dir=app/definition
app_install_dir=app/install
app_run_dir=app/run
+app_dbg_cmd=
+debug_mode_enabled=true
app_task_dir=app/command-log
app_log_dir=app/log
@@ -79,6 +84,10 @@ class AgentConfig:
APP_INSTALL_DIR = "app_install_dir"
# the location to store component instance PID directories
APP_RUN_DIR = "app_run_dir"
+ # debug hint for agents
+ APP_DBG_CMD = "app_dbg_cmd"
+ # allow agent to operate in debug mode
+ DEBUG_MODE_ENABLED = "debug_mode_enabled"
# run time dir for command executions
APP_TASK_DIR = "app_task_dir"
@@ -137,6 +146,18 @@ class AgentConfig:
global config
return config.get(category, name)
+ def isDebugEnabled(self):
+ global config
+ enabled = config.get(AgentConfig.AGENT_SECTION, AgentConfig.DEBUG_MODE_ENABLED)
+ return enabled == "true";
+
+ def debugCommand(self):
+ global config
+ command = config.get(AgentConfig.AGENT_SECTION, AgentConfig.APP_DBG_CMD)
+ if command == None:
+ return ""
+ return command
+
def set(self, category, name, value):
global config
return config.set(category, name, value)
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-agent/src/main/python/agent/Constants.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/Constants.py b/slider-agent/src/main/python/agent/Constants.py
index b937cd2..88cd564 100644
--- a/slider-agent/src/main/python/agent/Constants.py
+++ b/slider-agent/src/main/python/agent/Constants.py
@@ -27,3 +27,6 @@ ALLOCATED_PORTS = "allocated_ports"
FOLDERS = "folders"
AGENT_WORK_ROOT = "AGENT_WORK_ROOT"
AGENT_LOG_ROOT = "AGENT_LOG_ROOT"
+DO_NOT_REGISTER = "DO_NOT_REGISTER"
+DO_NOT_HEARTBEAT = "DO_NOT_HEARTBEAT"
+DO_NOT_HEARTBEAT_AFTER_ = "DO_NOT_HEARTBEAT_AFTER_"
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-agent/src/main/python/agent/Controller.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/Controller.py b/slider-agent/src/main/python/agent/Controller.py
index fe5760d..b5dca92 100644
--- a/slider-agent/src/main/python/agent/Controller.py
+++ b/slider-agent/src/main/python/agent/Controller.py
@@ -36,6 +36,7 @@ from ActionQueue import ActionQueue
from NetUtil import NetUtil
import ssl
import ProcessHelper
+import Constants
logger = logging.getLogger()
@@ -84,10 +85,29 @@ class Controller(threading.Thread):
logger.info("Server connection disconnected.")
pass
+ def processDebugCommandForRegister(self):
+ self.processDebugCommand(Constants.DO_NOT_REGISTER)
+ pass
+
+ def processDebugCommandForHeartbeat(self):
+ self.processDebugCommand(Constants.DO_NOT_HEARTBEAT)
+ pass
+
+ def processDebugCommand(self, command):
+ if self.config.isDebugEnabled() and self.config.debugCommand() == command:
+ ## Test support - sleep for 10 minutes
+ logger.info("Received debug command: "
+ + self.config.debugCommand() + " Sleeping for 10 minutes")
+ time.sleep(60*10)
+ pass
+ pass
+
def registerWithServer(self):
id = -1
ret = {}
+ self.processDebugCommandForRegister()
+
while not self.isRegistered:
try:
data = json.dumps(self.register.build(id))
@@ -170,7 +190,8 @@ class Controller(threading.Thread):
retry = False
certVerifFailed = False
- id = 0
+ self.processDebugCommandForHeartbeat()
+
while not self.DEBUG_STOP_HEARTBEATING:
if self.shouldStopAgent():
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-agent/src/main/python/agent/main.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/main.py b/slider-agent/src/main/python/agent/main.py
index afe3595..3632157 100644
--- a/slider-agent/src/main/python/agent/main.py
+++ b/slider-agent/src/main/python/agent/main.py
@@ -176,6 +176,7 @@ def main():
parser.add_option("-l", "--label", dest="label", help="label of the agent", default=None)
parser.add_option("--host", dest="host", help="AppMaster host", default=None)
parser.add_option("--port", dest="port", help="AppMaster port", default=None)
+ parser.add_option("--debug", dest="debug", help="Agent debug hint", default="")
(options, args) = parser.parse_args()
if not 'AGENT_WORK_ROOT' in os.environ:
@@ -200,6 +201,9 @@ def main():
if options.port:
agentConfig.set(AgentConfig.SERVER_SECTION, "port", options.port)
+ if options.debug:
+ agentConfig.set(AgentConfig.AGENT_SECTION, AgentConfig.APP_DBG_CMD, options.debug)
+
logFile = os.path.join(agentConfig.getResolvedPath(AgentConfig.LOG_DIR), logFileName)
perform_prestart_checks(agentConfig)
ensure_folder_layout(agentConfig)
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-agent/src/test/python/agent/TestController.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/test/python/agent/TestController.py b/slider-agent/src/test/python/agent/TestController.py
index 8dc7458..8c671da 100644
--- a/slider-agent/src/test/python/agent/TestController.py
+++ b/slider-agent/src/test/python/agent/TestController.py
@@ -557,6 +557,36 @@ class TestController(unittest.TestCase):
#Conroller thread and the agent stop if the repeatRegistration flag is False
self.assertFalse(self.controller.repeatRegistration)
+ @patch("time.sleep")
+ def test_debugSetupForRegister(self, sleepMock):
+ original_value = self.controller.config
+ self.controller.config = AgentConfig("", "")
+ self.controller.config.set(AgentConfig.AGENT_SECTION, AgentConfig.DEBUG_MODE_ENABLED, "true")
+ self.controller.processDebugCommandForRegister()
+ self.controller.processDebugCommandForHeartbeat()
+ assert not sleepMock.called, 'sleep should not have been called'
+
+ self.controller.config.set(AgentConfig.AGENT_SECTION, AgentConfig.APP_DBG_CMD, "DO_NOT_RERISTER")
+ self.controller.config.set(AgentConfig.AGENT_SECTION, AgentConfig.APP_DBG_CMD, "DO_NOT_HEARTBEET")
+ self.controller.processDebugCommandForRegister()
+ self.controller.processDebugCommandForHeartbeat()
+ assert not sleepMock.called, 'sleep should not have been called'
+
+ self.controller.config.set(AgentConfig.AGENT_SECTION, AgentConfig.APP_DBG_CMD, "DO_NOT_REGISTER")
+ self.controller.processDebugCommandForRegister()
+ assert sleepMock.called, 'sleep should have been called'
+
+ self.controller.processDebugCommandForHeartbeat()
+ assert sleepMock.call_count == 1, 'sleep should have been called once'
+
+ self.controller.config.set(AgentConfig.AGENT_SECTION, AgentConfig.APP_DBG_CMD, "DO_NOT_HEARTBEAT")
+ self.controller.processDebugCommandForHeartbeat()
+ assert sleepMock.call_count == 2, 'sleep should have been called twice'
+
+ self.controller.config = original_value
+ pass
+
+
if __name__ == "__main__":
logging.basicConfig(format='%(asctime)s %(message)s',level=logging.DEBUG)
unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-agent/src/test/python/agent/TestMain.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/test/python/agent/TestMain.py b/slider-agent/src/test/python/agent/TestMain.py
index 5273623..179d1b4 100644
--- a/slider-agent/src/test/python/agent/TestMain.py
+++ b/slider-agent/src/test/python/agent/TestMain.py
@@ -259,11 +259,12 @@ class TestMain(unittest.TestCase):
self.assertTrue(start_mock.called)
class AgentOptions:
- def __init__(self, label, host, port, verbose):
+ def __init__(self, label, host, port, verbose, debug):
self.label = label
self.host = host
self.port = port
self.verbose = verbose
+ self.debug = debug
@patch.object(main, "setup_logging")
@patch.object(main, "bind_signal_handlers")
@@ -289,7 +290,7 @@ class TestMain(unittest.TestCase):
Controller_init_mock.return_value = None
isAlive_mock.return_value = False
parse_args_mock.return_value = (
- TestMain.AgentOptions("agent", "host1", "8080", True), [])
+ TestMain.AgentOptions("agent", "host1", "8080", True, ""), [])
tmpdir = tempfile.gettempdir()
#testing call without command-line arguments
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
index 7c9b38e..bbad90f 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
@@ -30,6 +30,7 @@ import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.core.main.ExitCodeProvider;
import org.apache.slider.core.registry.info.RegisteredEndpoint;
import org.apache.slider.core.registry.info.ServiceInstanceData;
+import org.apache.slider.server.appmaster.AMViewForProviders;
import org.apache.slider.server.appmaster.state.StateAccessForProviders;
import org.apache.slider.server.appmaster.web.rest.agent.AgentRestOperations;
import org.apache.slider.server.services.registry.RegistryViewForProviders;
@@ -66,6 +67,7 @@ public abstract class AbstractProviderService
protected AgentRestOperations restOps;
protected RegistryViewForProviders registry;
protected ServiceInstanceData registryInstanceData;
+ protected AMViewForProviders amView;
protected URL amWebAPI;
public AbstractProviderService(String name) {
@@ -81,15 +83,20 @@ public abstract class AbstractProviderService
return amState;
}
+ public AMViewForProviders getAppMaster() {
+ return amView;
+ }
+
public void setAmState(StateAccessForProviders amState) {
this.amState = amState;
}
@Override
public void bind(StateAccessForProviders stateAccessor,
- RegistryViewForProviders reg) {
+ RegistryViewForProviders reg, AMViewForProviders amView) {
this.amState = stateAccessor;
this.registry = reg;
+ this.amView = amView;
}
@Override
@@ -129,16 +136,16 @@ public abstract class AbstractProviderService
/**
* No-op implementation of this method.
- *
+ *
* {@inheritDoc}
*/
@Override
public void validateApplicationConfiguration(AggregateConf instance,
File confDir,
boolean secure) throws
- IOException,
+ IOException,
SliderException {
-
+
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
index d77135c..1778197 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
@@ -31,6 +31,7 @@ import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.core.launch.ContainerLauncher;
import org.apache.slider.core.main.ExitCodeProvider;
import org.apache.slider.core.registry.info.ServiceInstanceData;
+import org.apache.slider.server.appmaster.AMViewForProviders;
import org.apache.slider.server.appmaster.state.StateAccessForProviders;
import org.apache.slider.server.appmaster.web.rest.agent.AgentRestOperations;
import org.apache.slider.server.services.registry.RegistryViewForProviders;
@@ -138,9 +139,11 @@ public interface ProviderService extends ProviderCore, Service,
* bind operation -invoked before the service is started
* @param stateAccessor interface offering read access to the state
* @param registry
+ * @param amView
*/
void bind(StateAccessForProviders stateAccessor,
- RegistryViewForProviders registry);
+ RegistryViewForProviders registry,
+ AMViewForProviders amView);
/**
* Returns the agent rest operations interface.
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
index 7136fd9..dea39ea 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
@@ -71,6 +71,7 @@ public interface AgentKeys {
String ARG_LABEL = "--label";
String ARG_HOST = "--host";
String ARG_PORT = "--port";
+ String ARG_DEBUG = "--debug";
String AGENT_MAIN_SCRIPT_ROOT = "./infra/agent/slider-agent/";
String AGENT_MAIN_SCRIPT = "agent/main.py";
@@ -88,6 +89,8 @@ public interface AgentKeys {
String COMPONENT_SCRIPT = "role.script";
String WAIT_HEARTBEAT = "wait.heartbeat";
String PYTHON_EXE = "python";
+ String HEARTBEAT_MONITOR_INTERVAL = "heartbeat.monitor.interval";
+ String AGENT_INSTANCE_DEBUG_DATA = "agent.instance.debug.data";
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-core/src/main/java/org/apache/slider/providers/agent/AgentLaunchParameter.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentLaunchParameter.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentLaunchParameter.java
new file mode 100644
index 0000000..b839e58
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentLaunchParameter.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers.agent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+class AgentLaunchParameter {
+ public static final Logger log =
+ LoggerFactory.getLogger(AgentLaunchParameter.class);
+ private static final String DEFAULT_PARAMETER = "";
+ private static final String ANY_COMPONENT = "ANY";
+ private static final String NONE_VALUE = "NONE";
+ private final Map<String, CommandTracker> launchParameterTracker;
+
+ public AgentLaunchParameter(String parameters) {
+ launchParameterTracker = parseExpectedLaunchParameters(parameters);
+ }
+
+ /**
+ * Get command for the component type
+ *
+ * @param componentName
+ *
+ * @return
+ */
+ public String getNextLaunchParameter(String componentName) {
+ if (launchParameterTracker != null) {
+ if (launchParameterTracker.containsKey(componentName)
+ || launchParameterTracker.containsKey(ANY_COMPONENT)) {
+ synchronized (this) {
+ CommandTracker indexTracker = null;
+ if (launchParameterTracker.containsKey(componentName)) {
+ indexTracker = launchParameterTracker.get(componentName);
+ } else {
+ indexTracker = launchParameterTracker.get(ANY_COMPONENT);
+ }
+
+ return indexTracker.getNextCommand();
+ }
+ }
+ }
+
+ return DEFAULT_PARAMETER;
+ }
+
+ /**
+ * Parse launch parameters of the form ANY:PARAM_FOR_FIRST:PARAM_FOR_SECOND:...:PARAM_FOR_REST|HBASE_MASTER:...
+ *
+ * E.g. ANY:DO_NOT_REGISTER:DO_NOT_HEARTBEAT:NONE For any container, first one gets DO_NOT_REGISTER second one gets
+ * DO_NOT_HEARTBEAT, then all of the rest get nothing
+ *
+ * E.g. HBASE_MASTER:FAIL_AFTER_START:NONE For HBASE_MASTER, first one gets FAIL_AFTER_START then "" for all
+ *
+ * @param launchParameters
+ *
+ * @return
+ */
+ Map<String, CommandTracker> parseExpectedLaunchParameters(String launchParameters) {
+ Map<String, CommandTracker> trackers = null;
+ if (launchParameters != null && launchParameters.length() > 0) {
+ String[] componentSpecificParameters = launchParameters.split(Pattern.quote("|"));
+ for (String componentSpecificParameter : componentSpecificParameters) {
+ if (componentSpecificParameter.length() != 0) {
+ String[] parameters = componentSpecificParameter.split(Pattern.quote(":"));
+
+ if (parameters.length > 1 && parameters[0].length() > 0) {
+
+ for (int index = 1; index < parameters.length; index++) {
+ if (parameters[index].equals(NONE_VALUE)) {
+ parameters[index] = DEFAULT_PARAMETER;
+ }
+ }
+
+ if (trackers == null) {
+ trackers = new HashMap<>(10);
+ }
+ String componentName = parameters[0];
+ CommandTracker tracker = new CommandTracker(Arrays.copyOfRange(parameters, 1, parameters.length));
+ trackers.put(componentName, tracker);
+ }
+ }
+ }
+ }
+
+ return trackers;
+ }
+
+ class CommandTracker {
+ private final int maxIndex;
+ private final String[] launchCommands;
+ private int currentIndex;
+
+ CommandTracker(String[] launchCommands) {
+ this.currentIndex = 0;
+ this.maxIndex = launchCommands.length - 1;
+ this.launchCommands = launchCommands;
+ }
+
+ String getNextCommand() {
+ String retVal = launchCommands[currentIndex];
+ if (currentIndex != maxIndex) {
+ currentIndex++;
+ }
+
+ return retVal;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
index 5ac142b..85945d8 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
@@ -91,7 +91,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.slider.server.appmaster.web.rest.RestPaths.SLIDER_PATH_AGENTS;
-/** This class implements the server-side aspects of an agent deployment */
+/** This class implements the server-side logic for application deployment
+ * through Slider application package
+ **/
public class AgentProviderService extends AbstractProviderService implements
ProviderCore,
AgentKeys,
@@ -106,13 +108,16 @@ public class AgentProviderService extends AbstractProviderService implements
private static final String GLOBAL_CONFIG_TAG = "global";
private static final String LOG_FOLDERS_TAG = "LogFolders";
private static final int MAX_LOG_ENTRIES = 20;
+ private static final int DEFAULT_HEARTBEAT_MONITOR_INTERVAL = 60 * 1000;
private final Object syncLock = new Object();
private final Map<String, String> allocatedPorts = new ConcurrentHashMap<>();
+ private int heartbeatMonitorInterval = 0;
private AgentClientProvider clientProvider;
- private Map<String, ComponentInstanceState> componentStatuses = new HashMap<>();
+ private Map<String, ComponentInstanceState> componentStatuses = new ConcurrentHashMap<>();
private AtomicInteger taskId = new AtomicInteger(0);
private volatile Metainfo metainfo = null;
private ComponentCommandOrder commandOrder = null;
+ private HeartbeatMonitor monitor;
private Map<String, String> workFolders =
Collections.synchronizedMap(new LinkedHashMap<String, String>(MAX_LOG_ENTRIES, 0.75f, false) {
protected boolean removeEldestEntry(Map.Entry eldest) {
@@ -120,10 +125,15 @@ public class AgentProviderService extends AbstractProviderService implements
}
});
private Boolean canAnyMasterPublish = null;
+ private AgentLaunchParameter agentLaunchParameter = null;
+ /**
+ * Create an instance of AgentProviderService
+ */
public AgentProviderService() {
super("AgentProviderService");
setAgentRestOperations(this);
+ setHeartbeatMonitorInterval(DEFAULT_HEARTBEAT_MONITOR_INTERVAL);
}
@Override
@@ -170,6 +180,9 @@ public class AgentProviderService extends AbstractProviderService implements
if (metainfo == null) {
synchronized (syncLock) {
if (metainfo == null) {
+ readAndSetHeartbeatMonitoringInterval(instanceDefinition);
+ initializeAgentDebugCommands(instanceDefinition);
+
metainfo = getApplicationMetainfo(fileSystem, appDef);
if (metainfo == null || metainfo.getServices() == null || metainfo.getServices().size() == 0) {
log.error("metainfo.xml is unavailable or malformed at {}.", appDef);
@@ -177,6 +190,8 @@ public class AgentProviderService extends AbstractProviderService implements
}
commandOrder = new ComponentCommandOrder(metainfo.getServices().get(0).getCommandOrder());
+ monitor = new HeartbeatMonitor(this, getHeartbeatMonitorInterval());
+ monitor.start();
}
}
}
@@ -246,6 +261,12 @@ public class AgentProviderService extends AbstractProviderService implements
operation.add(ARG_PORT);
operation.add(getClusterInfoPropertyValue(StatusKeys.INFO_AM_WEB_PORT));
+ String debugCmd = agentLaunchParameter.getNextLaunchParameter(role);
+ if (debugCmd != null && debugCmd.length() != 0) {
+ operation.add(ARG_DEBUG);
+ operation.add(debugCmd);
+ }
+
launcher.addCommand(operation.build());
// initialize the component instance state
@@ -256,15 +277,66 @@ public class AgentProviderService extends AbstractProviderService implements
getClusterInfoPropertyValue(OptionKeys.APPLICATION_NAME)));
}
+ /**
+ * Reads and sets the heartbeat monitoring interval. If bad value is provided then log it and set to default.
+ * @param instanceDefinition
+ */
+ private void readAndSetHeartbeatMonitoringInterval(AggregateConf instanceDefinition) {
+ String hbMonitorInterval = instanceDefinition.getAppConfOperations().
+ getGlobalOptions().getOption(AgentKeys.HEARTBEAT_MONITOR_INTERVAL,
+ Integer.toString(DEFAULT_HEARTBEAT_MONITOR_INTERVAL));
+ try {
+ setHeartbeatMonitorInterval(Integer.parseInt(hbMonitorInterval));
+ }catch (NumberFormatException e) {
+ log.warn(
+ "Bad value {} for {}. Defaulting to ",
+ hbMonitorInterval,
+ HEARTBEAT_MONITOR_INTERVAL,
+ DEFAULT_HEARTBEAT_MONITOR_INTERVAL);
+ }
+ }
+
+ /**
+ * Reads and sets the heartbeat monitoring interval. If bad value is provided then log it and set to default.
+ * @param instanceDefinition
+ */
+ private void initializeAgentDebugCommands(AggregateConf instanceDefinition) {
+ String launchParameterStr = instanceDefinition.getAppConfOperations().
+ getGlobalOptions().getOption(AgentKeys.AGENT_INSTANCE_DEBUG_DATA, "");
+ agentLaunchParameter = new AgentLaunchParameter(launchParameterStr);
+ }
+
+ @VisibleForTesting
protected Metainfo getMetainfo() {
return this.metainfo;
}
+ @VisibleForTesting
+ protected Map<String, ComponentInstanceState> getComponentStatuses() {
+ return componentStatuses;
+ }
+
+ @VisibleForTesting
protected Metainfo getApplicationMetainfo(SliderFileSystem fileSystem,
String appDef) throws IOException {
return AgentUtils.getApplicationMetainfo(fileSystem, appDef);
}
+ @VisibleForTesting
+ protected void setHeartbeatMonitorInterval(int heartbeatMonitorInterval) {
+ this.heartbeatMonitorInterval = heartbeatMonitorInterval;
+ }
+
+ private int getHeartbeatMonitorInterval() {
+ return this.heartbeatMonitorInterval;
+ }
+
+ /**
+ * Publish a named config bag that may contain name-value pairs for app configurations such as hbase-site
+ * @param name
+ * @param description
+ * @param entries
+ */
protected void publishComponentConfiguration(String name, String description,
Iterable<Map.Entry<String, String>> entries) {
PublishedConfiguration pubconf = new PublishedConfiguration();
@@ -274,6 +346,10 @@ public class AgentProviderService extends AbstractProviderService implements
getAmState().getPublishedSliderConfigurations().put(name, pubconf);
}
+ /**
+ * Get a list of all hosts for all role/container per role
+ * @return
+ */
protected Map<String, Map<String, ClusterNode>> getRoleClusterNodeMapping() {
amState.refreshClusterStatus();
return (Map<String, Map<String, ClusterNode>>)
@@ -293,6 +369,25 @@ public class AgentProviderService extends AbstractProviderService implements
}
/**
+ * Lost heartbeat from the container - release it and ask for a replacement
+ *
+ * @param label
+ *
+ * @return if release is requested successfully
+ */
+ protected boolean releaseContainer(String label) {
+ componentStatuses.remove(label);
+ try {
+ getAppMaster().refreshContainer(getContainerId(label), true);
+ } catch (SliderException e) {
+ log.info("Error while requesting container release for {}. Message: {}", label, e.getMessage());
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
* Run this service
*
* @param instanceDefinition component description
@@ -329,12 +424,18 @@ public class AgentProviderService extends AbstractProviderService implements
return true;
}
+ /**
+ * Handle registration calls from the agents
+ * @param registration
+ * @return
+ */
@Override
public RegistrationResponse handleRegistration(Register registration) {
RegistrationResponse response = new RegistrationResponse();
String label = registration.getHostname();
if (componentStatuses.containsKey(label)) {
response.setResponseStatus(RegistrationStatus.OK);
+ componentStatuses.get(label).setLastHeartbeat(System.currentTimeMillis());
} else {
response.setResponseStatus(RegistrationStatus.FAILED);
response.setLog("Label not recognized.");
@@ -342,31 +443,11 @@ public class AgentProviderService extends AbstractProviderService implements
return response;
}
- private Command getCommand(String commandVal) {
- if (commandVal.equals(Command.START.toString())) {
- return Command.START;
- }
- if (commandVal.equals(Command.INSTALL.toString())) {
- return Command.INSTALL;
- }
-
- return Command.NOP;
- }
-
- private CommandResult getCommandResult(String commandResVal) {
- if (commandResVal.equals(CommandResult.COMPLETED.toString())) {
- return CommandResult.COMPLETED;
- }
- if (commandResVal.equals(CommandResult.FAILED.toString())) {
- return CommandResult.FAILED;
- }
- if (commandResVal.equals(CommandResult.IN_PROGRESS.toString())) {
- return CommandResult.IN_PROGRESS;
- }
-
- throw new IllegalArgumentException("Unrecognized value " + commandResVal);
- }
-
+ /**
+ * Handle heartbeat response from agents
+ * @param heartBeat
+ * @return
+ */
@Override
public HeartBeatResponse handleHeartBeat(HeartBeat heartBeat) {
HeartBeatResponse response = new HeartBeatResponse();
@@ -391,6 +472,7 @@ public class AgentProviderService extends AbstractProviderService implements
Boolean isMaster = isMaster(roleName);
ComponentInstanceState componentStatus = componentStatuses.get(label);
+ componentStatus.setLastHeartbeat(System.currentTimeMillis());
// If no Master can explicitly publish then publish if its a master
// Otherwise, wait till the master that can publish is ready
if (isMaster &&
@@ -408,8 +490,8 @@ public class AgentProviderService extends AbstractProviderService implements
this.allocatedPorts.put(port.getKey(), port.getValue());
}
}
- CommandResult result = getCommandResult(report.getStatus());
- Command command = getCommand(report.getRoleCommand());
+ CommandResult result = CommandResult.getCommandResult(report.getStatus());
+ Command command = Command.getCommand(report.getRoleCommand());
componentStatus.applyCommandResult(result, command);
log.info("Component operation. Status: {}", result);
@@ -461,6 +543,12 @@ public class AgentProviderService extends AbstractProviderService implements
return response;
}
+ /**
+ * Format the folder locations before publishing in the registry service
+ * @param folders
+ * @param containerId
+ * @param hostFqdn
+ */
private void processFolderPaths(Map<String, String> folders, String containerId, String hostFqdn) {
for (String key : folders.keySet()) {
workFolders.put(String.format("%s-%s-%s", hostFqdn, containerId, key), folders.get(key));
@@ -469,6 +557,11 @@ public class AgentProviderService extends AbstractProviderService implements
publishComponentConfiguration(LOG_FOLDERS_TAG, LOG_FOLDERS_TAG, (new HashMap<>(this.workFolders)).entrySet());
}
+ /**
+ * Process return status for component instances
+ * @param heartBeat
+ * @param componentStatus
+ */
protected void processReturnedStatus(HeartBeat heartBeat, ComponentInstanceState componentStatus) {
List<ComponentStatus> statuses = heartBeat.getComponentStatus();
if (statuses != null && !statuses.isEmpty()) {
@@ -529,6 +622,11 @@ public class AgentProviderService extends AbstractProviderService implements
}
}
+ /**
+ * Extract script path from the application metainfo
+ * @param roleName
+ * @return
+ */
protected String getScriptPathFromMetainfo(String roleName) {
String scriptPath = null;
List<Service> services = getMetainfo().getServices();
@@ -545,6 +643,11 @@ public class AgentProviderService extends AbstractProviderService implements
return scriptPath;
}
+ /**
+ * Is the role of type MASTER
+ * @param roleName
+ * @return
+ */
protected boolean isMaster(String roleName) {
List<Service> services = getMetainfo().getServices();
if (services.size() != 1) {
@@ -564,6 +667,11 @@ public class AgentProviderService extends AbstractProviderService implements
return false;
}
+ /**
+ * Can the role publish configuration
+ * @param roleName
+ * @return
+ */
protected boolean canPublishConfig(String roleName) {
List<Service> services = getMetainfo().getServices();
if (services.size() != 1) {
@@ -579,6 +687,10 @@ public class AgentProviderService extends AbstractProviderService implements
return false;
}
+ /**
+ * Can any master publish config explicitly, if not a random master is used
+ * @return
+ */
protected boolean canAnyMasterPublishConfig() {
if (canAnyMasterPublish == null) {
List<Service> services = getMetainfo().getServices();
@@ -609,6 +721,15 @@ public class AgentProviderService extends AbstractProviderService implements
return label.substring(0, label.indexOf(LABEL_MAKER));
}
+ /**
+ * Add install command to the heartbeat response
+ * @param roleName
+ * @param containerId
+ * @param response
+ * @param scriptPath
+ * @throws SliderException
+ */
+ @VisibleForTesting
protected void addInstallCommand(String roleName, String containerId, HeartBeatResponse response, String scriptPath)
throws SliderException {
assert getAmState().isApplicationLive();
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-core/src/main/java/org/apache/slider/providers/agent/AgentRoles.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentRoles.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentRoles.java
index d8aefc6..281895a 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentRoles.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentRoles.java
@@ -18,7 +18,6 @@
package org.apache.slider.providers.agent;
-import org.apache.slider.common.SliderKeys;
import org.apache.slider.providers.ProviderRole;
import java.util.ArrayList;
@@ -27,22 +26,11 @@ import java.util.List;
public class AgentRoles {
/**
- * List of roles
+ * List of roles Agent provider does not have any roles by default. All roles are read from the application
+ * specification.
*/
protected static final List<ProviderRole> ROLES =
- new ArrayList<ProviderRole>();
-
- public static final int KEY_NODE =
- SliderKeys.ROLE_AM_PRIORITY_INDEX + 1;
- /**
- * Initialize role list
- */
-/*
- static {
- ROLES.add(new ProviderRole(AgentKeys.ROLE_NODE, KEY_NODE));
- }
-*/
-
+ new ArrayList<ProviderRole>();
public static List<ProviderRole> getRoles() {
return ROLES;
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java b/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java
index 541dcc2..cbeb69d 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java
@@ -22,5 +22,16 @@ package org.apache.slider.providers.agent;
public enum Command {
NOP, // do nothing
INSTALL, // Install the component
- START // Start the component
+ START; // Start the component
+
+ public static Command getCommand(String commandVal) {
+ if (commandVal.equals(Command.START.toString())) {
+ return Command.START;
+ }
+ if (commandVal.equals(Command.INSTALL.toString())) {
+ return Command.INSTALL;
+ }
+
+ return Command.NOP;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-core/src/main/java/org/apache/slider/providers/agent/CommandResult.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/CommandResult.java b/slider-core/src/main/java/org/apache/slider/providers/agent/CommandResult.java
index f318096..35d9116 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/CommandResult.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/CommandResult.java
@@ -22,5 +22,19 @@ package org.apache.slider.providers.agent;
public enum CommandResult {
IN_PROGRESS, // Command is in progress
COMPLETED, // Command has successfully completed
- FAILED // Command has failed
+ FAILED; // Command has failed
+
+ public static CommandResult getCommandResult(String commandResVal) {
+ if (commandResVal.equals(CommandResult.COMPLETED.toString())) {
+ return CommandResult.COMPLETED;
+ }
+ if (commandResVal.equals(CommandResult.FAILED.toString())) {
+ return CommandResult.FAILED;
+ }
+ if (commandResVal.equals(CommandResult.IN_PROGRESS.toString())) {
+ return CommandResult.IN_PROGRESS;
+ }
+
+ throw new IllegalArgumentException("Unrecognized value " + commandResVal);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java b/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java
index 2ad16af..60a6f82 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java
@@ -37,6 +37,8 @@ public class ComponentInstanceState {
private State targetState = State.STARTED;
private int failuresSeen = 0;
private Boolean configReported = false;
+ private long lastHeartbeat = 0;
+ private ContainerState containerState;
public ComponentInstanceState(String compName,
String containerId,
@@ -44,6 +46,8 @@ public class ComponentInstanceState {
this.compName = compName;
this.containerId = containerId;
this.applicationId = applicationId;
+ this.containerState = ContainerState.INIT;
+ this.lastHeartbeat = System.currentTimeMillis();
}
public String getCompName() {
@@ -58,6 +62,26 @@ public class ComponentInstanceState {
this.configReported = configReported;
}
+ public ContainerState getContainerState() {
+ return containerState;
+ }
+
+ public void setContainerState(ContainerState containerState) {
+ this.containerState = containerState;
+ }
+
+ public long getLastHeartbeat() {
+ return lastHeartbeat;
+ }
+
+ public void setLastHeartbeat(long lastHeartbeat) {
+ this.lastHeartbeat = lastHeartbeat;
+ if(this.containerState == ContainerState.UNHEALTHY ||
+ this.containerState == ContainerState.INIT) {
+ this.containerState = ContainerState.HEALTHY;
+ }
+ }
+
public void commandIssued(Command command) {
Command expected = getNextCommand();
if (expected != command) {
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-core/src/main/java/org/apache/slider/providers/agent/ContainerState.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/ContainerState.java b/slider-core/src/main/java/org/apache/slider/providers/agent/ContainerState.java
new file mode 100644
index 0000000..0394ba2
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/ContainerState.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers.agent;
+
+/** The states a component instance can be. */
+public enum ContainerState {
+ INIT, // Container is not net activated
+ HEALTHY, // Agent is heartbeating
+ UNHEALTHY, // Container is unhealthy - no heartbeat for some interval
+ HEARTBEAT_LOST; // Container is lost - request a new instance
+
+ /**
+ * Indicates whether or not it is a valid state to produce a command.
+ *
+ * @return true if command can be issued for this state.
+ */
+ public boolean canIssueCommands() {
+ switch (this) {
+ case HEALTHY:
+ return true;
+ default:
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-core/src/main/java/org/apache/slider/providers/agent/HeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/HeartbeatMonitor.java b/slider-core/src/main/java/org/apache/slider/providers/agent/HeartbeatMonitor.java
new file mode 100644
index 0000000..3aeff66
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/HeartbeatMonitor.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.slider.providers.agent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/** Monitors the container state and heartbeats. */
+public class HeartbeatMonitor implements Runnable {
+ protected static final Logger log =
+ LoggerFactory.getLogger(HeartbeatMonitor.class);
+ private final int threadWakeupInterval; //1 minute
+ private final AgentProviderService provider;
+ private volatile boolean shouldRun = true;
+ private Thread monitorThread = null;
+
+ public HeartbeatMonitor(AgentProviderService provider, int threadWakeupInterval) {
+ this.provider = provider;
+ this.threadWakeupInterval = threadWakeupInterval;
+ }
+
+ public void shutdown() {
+ shouldRun = false;
+ }
+
+ public void start() {
+ log.info("Starting heartbeat monitor with interval {}", threadWakeupInterval);
+ monitorThread = new Thread(this);
+ monitorThread.start();
+ }
+
+ void join(long millis) throws InterruptedException {
+ if (isAlive()) {
+ monitorThread.join(millis);
+ }
+ }
+
+ public boolean isAlive() {
+ if (monitorThread != null) {
+ return monitorThread.isAlive();
+ }
+ return false;
+ }
+
+ @Override
+ public void run() {
+ while (shouldRun) {
+ try {
+ log.debug("Putting monitor to sleep for " + threadWakeupInterval + " " +
+ "milliseconds");
+ Thread.sleep(threadWakeupInterval);
+ doWork();
+ } catch (InterruptedException ex) {
+ log.warn("Scheduler thread is interrupted going to stop", ex);
+ shouldRun = false;
+ } catch (Exception ex) {
+ log.warn("Exception received", ex);
+ } catch (Throwable t) {
+ log.warn("ERROR", t);
+ }
+ }
+ }
+
+ /**
+ * Every interval the current state of the container are checked. If the state is INIT or HEALTHY and no HB are
+ * received in last check interval they are marked as UNHEALTHY. INIT is when the agent is started but it did not
+ * communicate at all. HEALTHY being the AM has received heartbeats. After an interval as UNHEALTHY the container is
+ * declared unavailable
+ */
+ private void doWork() {
+ Map<String, ComponentInstanceState> componentStatuses = provider.getComponentStatuses();
+ if (componentStatuses != null) {
+ for (String containerLabel : componentStatuses.keySet()) {
+ ComponentInstanceState componentInstanceState = componentStatuses.get(containerLabel);
+ long timeSinceLastHeartbeat = System.currentTimeMillis() - componentInstanceState.getLastHeartbeat();
+
+ if (timeSinceLastHeartbeat > threadWakeupInterval) {
+ if (componentInstanceState.getContainerState() == ContainerState.HEALTHY ||
+ componentInstanceState.getContainerState() == ContainerState.INIT) {
+ componentInstanceState.setContainerState(ContainerState.UNHEALTHY);
+ log.warn("Component {} marked UNHEALTHY. Last heartbeat received at {} approx. {} ms. back.",
+ containerLabel, componentInstanceState.getLastHeartbeat(),
+ timeSinceLastHeartbeat);
+ continue;
+ }
+ if (componentInstanceState.getContainerState() == ContainerState.UNHEALTHY
+ && timeSinceLastHeartbeat > threadWakeupInterval * 2) {
+ componentInstanceState.setContainerState(ContainerState.HEARTBEAT_LOST);
+ log.warn("Component {} marked HEARTBEAT_LOST. Last heartbeat received at {} approx. {} ms. back.",
+ containerLabel, componentInstanceState.getLastHeartbeat(),
+ timeSinceLastHeartbeat);
+ this.provider.releaseContainer(containerLabel);
+ continue;
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-core/src/main/java/org/apache/slider/server/appmaster/AMViewForProviders.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/AMViewForProviders.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/AMViewForProviders.java
new file mode 100644
index 0000000..287035f
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/AMViewForProviders.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster;
+
+import org.apache.slider.core.exceptions.SliderException;
+
+/** Operations available to a provider from AppMaster */
+public interface AMViewForProviders {
+ /** Provider can ask AppMaster to release a specific container */
+ void refreshContainer(String containerId, boolean newHostIfPossible) throws SliderException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index 605c826..0142028 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -150,7 +150,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
ServiceStateChangeListener,
RoleKeys,
ProviderCompleted,
- ContainerStartOperation {
+ ContainerStartOperation,
+ AMViewForProviders {
protected static final Logger log =
LoggerFactory.getLogger(SliderAppMaster.class);
@@ -675,8 +676,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
//Give the provider restricted access to the state, registry
- providerService.bind(stateForProviders, registry);
- sliderAMProvider.bind(stateForProviders, registry);
+ providerService.bind(stateForProviders, registry, this);
+ sliderAMProvider.bind(stateForProviders, registry, null);
// now do the registration
registerServiceInstance(clustername, appid);
@@ -1345,6 +1346,30 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
}
}
+
+ /* =================================================================== */
+ /* ProviderAMOperations */
+ /* =================================================================== */
+
+ /**
+ * Refreshes the container by releasing it and having it reallocated
+ *
+ * @param containerId id of the container to release
+ * @param newHostIfPossible allocate the replacement container on a new host
+ *
+ * @throws SliderException
+ */
+ public void refreshContainer(String containerId, boolean newHostIfPossible)
+ throws SliderException {
+ log.info(
+ "Refreshing container {} per provider request.",
+ containerId);
+ rmOperationHandler.execute(appState.releaseContainer(containerId));
+
+ // ask for more containers if needed
+ reviewRequestAndReleaseNodes();
+ }
+
/* =================================================================== */
/* ServiceStateChangeListener */
/* =================================================================== */
@@ -1515,5 +1540,4 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
//now have the service launcher do its work
ServiceLauncher.serviceMain(extendedArgs);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
index b5e67f5..9981f68 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
@@ -1476,6 +1476,26 @@ public class AppState {
return operations;
}
+ /**
+ * Releases a container based on container id
+ * @param containerId
+ * @return
+ * @throws SliderInternalStateException
+ */
+ public List<AbstractRMOperation> releaseContainer(String containerId)
+ throws SliderInternalStateException {
+ List<AbstractRMOperation> operations = new ArrayList<>();
+ List<RoleInstance> activeRoleInstances = cloneActiveContainerList();
+ for (RoleInstance role : activeRoleInstances) {
+ if (role.container.getId().toString().equals(containerId)) {
+ containerReleaseSubmitted(role.container);
+ operations.add(new ContainerReleaseOperation(role.getId()));
+ }
+ }
+
+ return operations;
+ }
+
/**
* Find a container running on a specific host -looking
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-core/src/test/app_packages/test_command_log/appConfig_fast_no_reg.json
----------------------------------------------------------------------
diff --git a/slider-core/src/test/app_packages/test_command_log/appConfig_fast_no_reg.json b/slider-core/src/test/app_packages/test_command_log/appConfig_fast_no_reg.json
new file mode 100644
index 0000000..1f495c6
--- /dev/null
+++ b/slider-core/src/test/app_packages/test_command_log/appConfig_fast_no_reg.json
@@ -0,0 +1,29 @@
+{
+ "schema": "http://example.org/specification/v2.0.0",
+ "metadata": {
+ },
+ "global": {
+ "heartbeat.monitor.interval": "20000",
+ "agent.instance.debug.data": "ANY:DO_NOT_REGISTER:NONE",
+ "agent.conf": "agent.ini",
+ "application.def": "cmd_log_app_pkg.zip",
+ "config_types": "cl-site",
+ "java_home": "/usr/jdk64/jdk1.7.0_45",
+ "package_list": "files/command_log_10.tar",
+ "site.global.app_user": "yarn",
+ "site.global.application_id": "CommandLogger",
+ "site.global.app_log_dir": "${AGENT_LOG_ROOT}/app/log",
+ "site.global.app_pid_dir": "${AGENT_WORK_ROOT}/app/run",
+ "site.global.app_root": "${AGENT_WORK_ROOT}/app/install/hbase-0.96.1-hadoop2",
+ "site.global.app_install_dir": "${AGENT_WORK_ROOT}/app/install",
+ "site.cl-site.logfile.location": "${AGENT_LOG_ROOT}/app/log/operations.log",
+ "site.cl-site.datetime.format": "%A, %d. %B %Y %I:%M%p"
+ },
+ "components": {
+ "COMMAND_LOGGER": {
+ },
+ "slider-appmaster": {
+ "jvm.heapsize": "256M"
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-core/src/test/app_packages/test_command_log/appConfig_no_hb.json
----------------------------------------------------------------------
diff --git a/slider-core/src/test/app_packages/test_command_log/appConfig_no_hb.json b/slider-core/src/test/app_packages/test_command_log/appConfig_no_hb.json
new file mode 100644
index 0000000..c824e03
--- /dev/null
+++ b/slider-core/src/test/app_packages/test_command_log/appConfig_no_hb.json
@@ -0,0 +1,29 @@
+{
+ "schema": "http://example.org/specification/v2.0.0",
+ "metadata": {
+ },
+ "global": {
+ "heartbeat.monitor.interval": "20000",
+ "agent.instance.debug.data": "ANY:DO_NOT_HEARTBEAT:DO_NOT_HEARTBEAT:NONE",
+ "agent.conf": "agent.ini",
+ "application.def": "cmd_log_app_pkg.zip",
+ "config_types": "cl-site",
+ "java_home": "/usr/jdk64/jdk1.7.0_45",
+ "package_list": "files/command_log_10.tar",
+ "site.global.app_user": "yarn",
+ "site.global.application_id": "CommandLogger",
+ "site.global.app_log_dir": "${AGENT_LOG_ROOT}/app/log",
+ "site.global.app_pid_dir": "${AGENT_WORK_ROOT}/app/run",
+ "site.global.app_root": "${AGENT_WORK_ROOT}/app/install/hbase-0.96.1-hadoop2",
+ "site.global.app_install_dir": "${AGENT_WORK_ROOT}/app/install",
+ "site.cl-site.logfile.location": "${AGENT_LOG_ROOT}/app/log/operations.log",
+ "site.cl-site.datetime.format": "%A, %d. %B %Y %I:%M%p"
+ },
+ "components": {
+ "COMMAND_LOGGER": {
+ },
+ "slider-appmaster": {
+ "jvm.heapsize": "256M"
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
index 361fc2e..873bc93 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
@@ -34,6 +34,7 @@ import org.apache.slider.core.launch.ContainerLauncher
import org.apache.slider.core.registry.info.ServiceInstanceData
import org.apache.slider.providers.ProviderRole
import org.apache.slider.providers.ProviderService
+import org.apache.slider.server.appmaster.AMViewForProviders
import org.apache.slider.server.appmaster.state.StateAccessForProviders
import org.apache.slider.server.appmaster.web.rest.agent.AgentRestOperations
import org.apache.slider.server.appmaster.web.rest.agent.HeartBeat
@@ -194,7 +195,8 @@ class MockProviderService implements ProviderService {
@Override
void bind(
StateAccessForProviders stateAccessor,
- RegistryViewForProviders registry) {
+ RegistryViewForProviders registry,
+ AMViewForProviders amView) {
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentLaunchParameter.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentLaunchParameter.java b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentLaunchParameter.java
new file mode 100644
index 0000000..ec62b54
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentLaunchParameter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.slider.providers.agent;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ *
+ */
+public class TestAgentLaunchParameter {
+ protected static final Logger log =
+ LoggerFactory.getLogger(TestAgentLaunchParameter.class);
+
+ @Test
+ public void testTestAgentLaunchParameter() throws Exception {
+ AgentLaunchParameter alp = new AgentLaunchParameter("");
+ Assert.assertEquals("", alp.getNextLaunchParameter("abc"));
+ Assert.assertEquals("", alp.getNextLaunchParameter("HBASE_MASTER"));
+
+ alp = new AgentLaunchParameter("a:1:2:3|b:5:6:NONE");
+ Assert.assertEquals("1", alp.getNextLaunchParameter("a"));
+ Assert.assertEquals("2", alp.getNextLaunchParameter("a"));
+ Assert.assertEquals("3", alp.getNextLaunchParameter("a"));
+ Assert.assertEquals("3", alp.getNextLaunchParameter("a"));
+
+ Assert.assertEquals("5", alp.getNextLaunchParameter("b"));
+ Assert.assertEquals("6", alp.getNextLaunchParameter("b"));
+ Assert.assertEquals("", alp.getNextLaunchParameter("b"));
+ Assert.assertEquals("", alp.getNextLaunchParameter("b"));
+ Assert.assertEquals("", alp.getNextLaunchParameter("c"));
+
+ alp = new AgentLaunchParameter("|a:1:3|b::5:NONE:");
+ Assert.assertEquals("1", alp.getNextLaunchParameter("a"));
+ Assert.assertEquals("3", alp.getNextLaunchParameter("a"));
+ Assert.assertEquals("3", alp.getNextLaunchParameter("a"));
+
+ Assert.assertEquals("", alp.getNextLaunchParameter("b"));
+ Assert.assertEquals("5", alp.getNextLaunchParameter("b"));
+ Assert.assertEquals("", alp.getNextLaunchParameter("b"));
+ Assert.assertEquals("", alp.getNextLaunchParameter("b"));
+
+ alp = new AgentLaunchParameter("|:");
+ Assert.assertEquals("", alp.getNextLaunchParameter("b"));
+ Assert.assertEquals("", alp.getNextLaunchParameter("a"));
+
+ alp = new AgentLaunchParameter("HBASE_MASTER:a,b:DO_NOT_REGISTER:");
+ Assert.assertEquals("a,b", alp.getNextLaunchParameter("HBASE_MASTER"));
+ Assert.assertEquals("DO_NOT_REGISTER", alp.getNextLaunchParameter("HBASE_MASTER"));
+ Assert.assertEquals("DO_NOT_REGISTER", alp.getNextLaunchParameter("HBASE_MASTER"));
+
+ alp = new AgentLaunchParameter("HBASE_MASTER:a,b:DO_NOT_REGISTER::c:::");
+ Assert.assertEquals("a,b", alp.getNextLaunchParameter("HBASE_MASTER"));
+ Assert.assertEquals("DO_NOT_REGISTER", alp.getNextLaunchParameter("HBASE_MASTER"));
+ Assert.assertEquals("", alp.getNextLaunchParameter("HBASE_MASTER"));
+ Assert.assertEquals("c", alp.getNextLaunchParameter("HBASE_MASTER"));
+ Assert.assertEquals("c", alp.getNextLaunchParameter("HBASE_MASTER"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-core/src/test/java/org/apache/slider/providers/agent/TestHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/providers/agent/TestHeartbeatMonitor.java b/slider-core/src/test/java/org/apache/slider/providers/agent/TestHeartbeatMonitor.java
new file mode 100644
index 0000000..c2cfafd
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/providers/agent/TestHeartbeatMonitor.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.slider.providers.agent;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+/**
+ *
+ */
+public class TestHeartbeatMonitor {
+ protected static final Logger log =
+ LoggerFactory.getLogger(TestHeartbeatMonitor.class);
+
+ @Test
+ public void testRegularHeartbeat() throws Exception {
+ AgentProviderService provider = createNiceMock(AgentProviderService.class);
+ HeartbeatMonitor hbm = new HeartbeatMonitor(provider, 1 * 1000);
+ Assert.assertFalse(hbm.isAlive());
+ expect(provider.getComponentStatuses()).andReturn(null).anyTimes();
+ replay(provider);
+ hbm.start();
+ Assert.assertTrue(hbm.isAlive());
+ hbm.shutdown();
+ Thread.sleep(1 * 1000);
+ Assert.assertFalse(hbm.isAlive());
+ }
+
+ @Test
+ public void testHeartbeatMonitorWithHealthy() throws Exception {
+ AgentProviderService provider = createNiceMock(AgentProviderService.class);
+ HeartbeatMonitor hbm = new HeartbeatMonitor(provider, 500);
+ Assert.assertFalse(hbm.isAlive());
+ Map<String, ComponentInstanceState> statuses = new HashMap<>();
+ ComponentInstanceState state = new ComponentInstanceState("HBASE_MASTER", "Cid", "Aid");
+ state.setState(State.STARTED);
+ state.setLastHeartbeat(System.currentTimeMillis());
+ statuses.put("label_1", state);
+ expect(provider.getComponentStatuses()).andReturn(statuses).anyTimes();
+ replay(provider);
+ hbm.start();
+ Assert.assertTrue(hbm.isAlive());
+ Thread.sleep(1 * 1000);
+ hbm.shutdown();
+ Thread.sleep(1 * 1000);
+ Assert.assertFalse(hbm.isAlive());
+ }
+
+ @Test
+ public void testHeartbeatMonitorWithUnhealthyAndThenLost() throws Exception {
+ AgentProviderService provider = createNiceMock(AgentProviderService.class);
+ HeartbeatMonitor hbm = new HeartbeatMonitor(provider, 2 * 1000);
+ Assert.assertFalse(hbm.isAlive());
+ Map<String, ComponentInstanceState> statuses = new HashMap<>();
+ ComponentInstanceState masterState = new ComponentInstanceState("HBASE_MASTER", "Cid1", "Aid1");
+ masterState.setState(State.STARTED);
+ masterState.setLastHeartbeat(System.currentTimeMillis());
+ statuses.put("Aid1_Cid1_HBASE_MASTER", masterState);
+
+ ComponentInstanceState slaveState = new ComponentInstanceState("HBASE_REGIONSERVER", "Cid2", "Aid1");
+ slaveState.setState(State.STARTED);
+ slaveState.setLastHeartbeat(System.currentTimeMillis());
+ statuses.put("Aid1_Cid2_HBASE_REGIONSERVER", slaveState);
+
+ expect(provider.getComponentStatuses()).andReturn(statuses).anyTimes();
+ expect(provider.releaseContainer("Aid1_Cid2_HBASE_REGIONSERVER")).andReturn(true).once();
+ replay(provider);
+ hbm.start();
+
+ Thread.sleep(1 * 1000);
+ // just dial back by at least 2 sec but no more than 4
+ slaveState.setLastHeartbeat(System.currentTimeMillis() - (2 * 1000 + 100));
+ masterState.setLastHeartbeat(System.currentTimeMillis());
+
+ Thread.sleep(1 * 1000 + 500);
+ masterState.setLastHeartbeat(System.currentTimeMillis());
+
+ log.info("Slave container state {}", slaveState.getContainerState());
+ Assert.assertEquals(ContainerState.HEALTHY, masterState.getContainerState());
+ Assert.assertEquals(ContainerState.UNHEALTHY, slaveState.getContainerState());
+
+ Thread.sleep(1 * 1000);
+ // some lost heartbeats are ignored (e.g. ~ 1 sec)
+ masterState.setLastHeartbeat(System.currentTimeMillis() - 1 * 1000);
+
+ Thread.sleep(1 * 1000 + 500);
+
+ log.info("Slave container state {}", slaveState.getContainerState());
+ Assert.assertEquals(ContainerState.HEALTHY, masterState.getContainerState());
+ Assert.assertEquals(ContainerState.HEARTBEAT_LOST, slaveState.getContainerState());
+ hbm.shutdown();
+ }
+
+ @Test
+ public void testHeartbeatTransitions() {
+ ComponentInstanceState slaveState = new ComponentInstanceState("HBASE_REGIONSERVER", "Cid2", "Aid1");
+ slaveState.setState(State.STARTED);
+
+ Assert.assertEquals(ContainerState.INIT, slaveState.getContainerState());
+ slaveState.setLastHeartbeat(System.currentTimeMillis());
+ Assert.assertEquals(ContainerState.HEALTHY, slaveState.getContainerState());
+
+ slaveState.setContainerState(ContainerState.UNHEALTHY);
+ Assert.assertEquals(ContainerState.UNHEALTHY, slaveState.getContainerState());
+ slaveState.setLastHeartbeat(System.currentTimeMillis());
+ Assert.assertEquals(ContainerState.HEALTHY, slaveState.getContainerState());
+
+ slaveState.setContainerState(ContainerState.HEARTBEAT_LOST);
+ Assert.assertEquals(ContainerState.HEARTBEAT_LOST, slaveState.getContainerState());
+ slaveState.setLastHeartbeat(System.currentTimeMillis());
+ Assert.assertEquals(ContainerState.HEARTBEAT_LOST, slaveState.getContainerState());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-core/src/test/java/org/apache/slider/server/appmaster/web/rest/publisher/TestAgentProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/appmaster/web/rest/publisher/TestAgentProviderService.java b/slider-core/src/test/java/org/apache/slider/server/appmaster/web/rest/publisher/TestAgentProviderService.java
index 97199f4..2427009 100644
--- a/slider-core/src/test/java/org/apache/slider/server/appmaster/web/rest/publisher/TestAgentProviderService.java
+++ b/slider-core/src/test/java/org/apache/slider/server/appmaster/web/rest/publisher/TestAgentProviderService.java
@@ -17,6 +17,7 @@
package org.apache.slider.server.appmaster.web.rest.publisher;
import org.apache.slider.providers.agent.AgentProviderService;
+import org.apache.slider.server.appmaster.AMViewForProviders;
import org.apache.slider.server.appmaster.state.StateAccessForProviders;
import org.apache.slider.server.services.registry.RegistryViewForProviders;
import org.slf4j.Logger;
@@ -39,8 +40,8 @@ public class TestAgentProviderService extends AgentProviderService{
@Override
public void bind(StateAccessForProviders stateAccessor,
- RegistryViewForProviders reg) {
- super.bind(stateAccessor, reg);
+ RegistryViewForProviders reg, AMViewForProviders amView) {
+ super.bind(stateAccessor, reg, amView);
Map<String,String> dummyProps = new HashMap<>();
dummyProps.put("prop1", "val1");
dummyProps.put("prop2", "val2");
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/AgentCommandTestBase.groovy
----------------------------------------------------------------------
diff --git a/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/AgentCommandTestBase.groovy b/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/AgentCommandTestBase.groovy
index d5a91f0..6333fad 100644
--- a/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/AgentCommandTestBase.groovy
+++ b/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/AgentCommandTestBase.groovy
@@ -20,6 +20,7 @@ package org.apache.slider.funtest.lifecycle
import groovy.util.logging.Slf4j
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.security.AccessControlException
import org.apache.slider.common.SliderExitCodes
import org.apache.slider.common.params.Arguments
import org.apache.slider.common.params.SliderActions
@@ -28,22 +29,17 @@ import org.apache.slider.funtest.framework.FuntestProperties
import org.apache.slider.funtest.framework.SliderShell
import org.apache.tools.zip.ZipEntry
import org.apache.tools.zip.ZipOutputStream
-import org.junit.Assert
-import org.junit.Assume
import org.junit.Before
import org.junit.BeforeClass
-import org.junit.rules.TemporaryFolder
import org.junit.Rule
-
-import org.apache.hadoop.security.AccessControlException
-
+import org.junit.rules.TemporaryFolder
@Slf4j
class AgentCommandTestBase extends CommandTestBase
- implements FuntestProperties, Arguments, SliderExitCodes, SliderActions {
+implements FuntestProperties, Arguments, SliderExitCodes, SliderActions {
public static final boolean AGENTTESTS_ENABLED
-
+
protected static String APP_RESOURCE = "../slider-core/src/test/app_packages/test_command_log/resources.json"
protected static String APP_TEMPLATE = "../slider-core/src/test/app_packages/test_command_log/appConfig.json"
protected static String APP_PKG_DIR = "../slider-core/src/test/app_packages/test_command_log/"
@@ -85,7 +81,7 @@ class AgentCommandTestBase extends CommandTestBase
} catch (AccessControlException ace) {
log.info "No write access to test user home directory. " +
"Ensure home directory exists and has correct permissions." + ace.getMessage()
- Assume.assumeTrue("Ensure home directory exists and has correct permissions for test user.", false)
+ fail("Ensure home directory exists and has correct permissions for test user.")
}
}
@@ -135,11 +131,11 @@ class AgentCommandTestBase extends CommandTestBase
assert instanceCount == count, 'Instance count for component did not match expected. Parsed: ' + entry
}
- public static String findLineEntry(SliderShell shell, String[] locators) {
+ public static String findLineEntry(SliderShell shell, String[] locaters) {
int index = 0;
for (String str in shell.out) {
- if (str.contains("\"" + locators[index] + "\"")) {
- if (locators.size() == index + 1) {
+ if (str.contains("\"" + locaters[index] + "\"")) {
+ if (locaters.size() == index + 1) {
return str;
} else {
index++;
@@ -150,8 +146,29 @@ class AgentCommandTestBase extends CommandTestBase
return null;
}
- public static boolean isAppRunning(String text, SliderShell shell) {
+ public static String findLineEntryValue(SliderShell shell, String[] locaters) {
+ String line = findLineEntry(shell, locaters);
+
+ if (line != null) {
+ log.info("Parsing {} for value.", line)
+ int dividerIndex = line.indexOf(":");
+ if (dividerIndex > 0) {
+ String value = line.substring(dividerIndex + 1).trim()
+ if (value.endsWith(",")) {
+ value = value.subSequence(0, value.length() - 1)
+ }
+ return value;
+ }
+ }
+ return null;
+ }
+
+ public static boolean isApplicationInState(String text, String applicationName) {
boolean exists = false
+ SliderShell shell = slider(EXIT_SUCCESS,
+ [
+ ACTION_LIST,
+ applicationName])
for (String str in shell.out) {
if (str.contains(text)) {
exists = true
@@ -161,23 +178,14 @@ class AgentCommandTestBase extends CommandTestBase
return exists
}
- protected static void ensureApplicationIsUp(String clusterName, int maxAttemptCount = 15) {
- SliderShell shell
- int attemptCount = 0
- while (attemptCount < maxAttemptCount) {
- shell = slider(EXIT_SUCCESS, [
- ACTION_LIST,
- clusterName])
-
- if (isAppRunning("RUNNING", shell)) {
- break
- }
-
- attemptCount++
- assert attemptCount != maxAttemptCount, 'Application did not start, aborting test.'
+ protected void ensureApplicationIsUp(String clusterName) {
+ repeatUntilTrue(this.&isApplicationUp, 15, 1000 * 3, ['arg1': clusterName],
+ true, 'Application did not start, aborting test.')
+ }
- sleep(1000 * 3)
- }
+ boolean isApplicationUp(Map<String, String> args) {
+ String applicationName = args['arg1'];
+ return isApplicationInState("RUNNING", applicationName);
}
public static void addDir(File dirObj, ZipOutputStream zipFile, String prefix) {
@@ -204,4 +212,38 @@ class AgentCommandTestBase extends CommandTestBase
out.close();
}
}
+
+ protected void repeatUntilTrue(Closure c, int maxAttempts, int sleepDur, Map args,
+ boolean failIfUnsuccessful = false, String message = "") {
+ int attemptCount = 0
+ while (attemptCount < maxAttempts) {
+ if (c(args)) {
+ break
+ };
+ attemptCount++;
+
+ if (failIfUnsuccessful) {
+ assert attemptCount != maxAttempts, message
+ }
+
+ sleep(sleepDur)
+ }
+ }
+
+ protected void cleanup(String applicationName) throws Throwable {
+ log.info "Cleaning app instance, if exists, by name " + applicationName
+ teardown(applicationName)
+
+ // sleep till the instance is frozen
+ sleep(1000 * 3)
+
+ SliderShell shell = slider([
+ ACTION_DESTROY,
+ applicationName])
+
+ if (shell.ret != 0 && shell.ret != EXIT_UNKNOWN_INSTANCE) {
+ logShell(shell)
+ assert fail("Old cluster either should not exist or should get destroyed.")
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/TestAgentClusterLifecycle.groovy
----------------------------------------------------------------------
diff --git a/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/TestAgentClusterLifecycle.groovy b/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/TestAgentClusterLifecycle.groovy
index 3d0b270..06809cc 100644
--- a/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/TestAgentClusterLifecycle.groovy
+++ b/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/TestAgentClusterLifecycle.groovy
@@ -51,7 +51,7 @@ implements FuntestProperties, Arguments, SliderExitCodes, SliderActions {
@After
public void destroyCluster() {
- teardown(CLUSTER)
+ cleanup(CLUSTER)
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/TestAgentFailures.groovy
----------------------------------------------------------------------
diff --git a/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/TestAgentFailures.groovy b/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/TestAgentFailures.groovy
new file mode 100644
index 0000000..96a99ad
--- /dev/null
+++ b/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/TestAgentFailures.groovy
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.funtest.lifecycle
+
+import groovy.transform.CompileStatic
+import groovy.util.logging.Slf4j
+import org.apache.slider.common.SliderExitCodes
+import org.apache.slider.common.params.Arguments
+import org.apache.slider.common.params.SliderActions
+import org.apache.slider.funtest.framework.FuntestProperties
+import org.apache.slider.funtest.framework.SliderShell
+import org.junit.After
+import org.junit.Test
+
+@CompileStatic
+@Slf4j
+public class TestAgentFailures extends AgentCommandTestBase
+implements FuntestProperties, Arguments, SliderExitCodes, SliderActions {
+
+ private static String COMMAND_LOGGER = "COMMAND_LOGGER"
+ private static String APPLICATION_NAME = "one-container-fail-register"
+ private static String APP_TEMPLATE2 =
+ "../slider-core/src/test/app_packages/test_command_log/appConfig_fast_no_reg.json"
+
+
+ @After
+ public void destroyCluster() {
+ cleanup(APPLICATION_NAME)
+ }
+
+ @Test
+ public void testAgentFailRegistrationOnce() throws Throwable {
+ if (!AGENTTESTS_ENABLED) {
+ log.info "TESTS are not run."
+ return
+ }
+
+ cleanup(APPLICATION_NAME)
+ SliderShell shell = slider(EXIT_SUCCESS,
+ [
+ ACTION_CREATE, APPLICATION_NAME,
+ ARG_IMAGE, agentTarballPath.toString(),
+ ARG_TEMPLATE, APP_TEMPLATE2,
+ ARG_RESOURCES, APP_RESOURCE
+ ])
+
+ logShell(shell)
+
+ ensureApplicationIsUp(APPLICATION_NAME)
+
+ repeatUntilTrue(this.&hasContainerCountExceeded, 15, 1000 * 10, ['arg1': '2']);
+
+ sleep(1000 * 20)
+
+ shell = slider(EXIT_SUCCESS,
+ [
+ ACTION_STATUS,
+ APPLICATION_NAME])
+
+ assertComponentCount(COMMAND_LOGGER, 1, shell)
+ String requested = findLineEntryValue(shell, ["statistics", COMMAND_LOGGER, "containers.requested"] as String[])
+ assert requested != null && requested.isInteger() && requested.toInteger() >= 2,
+ 'At least 2 containers must be requested'
+
+ assert isApplicationInState("RUNNING", APPLICATION_NAME), 'App is not running.'
+
+ assertSuccess(shell)
+ }
+
+
+ boolean hasContainerCountExceeded(Map<String, String> args) {
+ int expectedCount = args['arg1'].toInteger();
+ SliderShell shell = slider(EXIT_SUCCESS,
+ [
+ ACTION_STATUS,
+ APPLICATION_NAME])
+
+ //logShell(shell)
+ String requested = findLineEntryValue(
+ shell, ["statistics", COMMAND_LOGGER, "containers.requested"] as String[])
+ if (requested != null && requested.isInteger() && requested.toInteger() >= expectedCount) {
+ return true
+ }
+
+ return false
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3aca57d2/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/TestAgentFailures2.groovy
----------------------------------------------------------------------
diff --git a/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/TestAgentFailures2.groovy b/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/TestAgentFailures2.groovy
new file mode 100644
index 0000000..644fa4f
--- /dev/null
+++ b/slider-funtest/src/test/groovy/org/apache/slider/funtest/lifecycle/TestAgentFailures2.groovy
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.funtest.lifecycle
+
+import groovy.transform.CompileStatic
+import groovy.util.logging.Slf4j
+import org.apache.slider.common.SliderExitCodes
+import org.apache.slider.common.params.Arguments
+import org.apache.slider.common.params.SliderActions
+import org.apache.slider.funtest.framework.FuntestProperties
+import org.apache.slider.funtest.framework.SliderShell
+import org.junit.After
+import org.junit.Test
+
+@CompileStatic
+@Slf4j
+public class TestAgentFailures2 extends AgentCommandTestBase
+implements FuntestProperties, Arguments, SliderExitCodes, SliderActions {
+
+ private static String COMMAND_LOGGER = "COMMAND_LOGGER"
+ private static String APPLICATION_NAME = "two-container-fail-heartbeat"
+ private static String APP_TEMPLATE3 =
+ "../slider-core/src/test/app_packages/test_command_log/appConfig_no_hb.json"
+
+
+ @After
+ public void destroyCluster() {
+ cleanup(APPLICATION_NAME)
+ }
+
+ @Test
+ public void testAgentFailHeartbeatingTwiceOnce() throws Throwable {
+ if (!AGENTTESTS_ENABLED) {
+ log.info "TESTS are not run."
+ return
+ }
+
+ cleanup(APPLICATION_NAME)
+ SliderShell shell = slider(EXIT_SUCCESS,
+ [
+ ACTION_CREATE, APPLICATION_NAME,
+ ARG_IMAGE, agentTarballPath.toString(),
+ ARG_TEMPLATE, APP_TEMPLATE3,
+ ARG_RESOURCES, APP_RESOURCE
+ ])
+
+ logShell(shell)
+
+ ensureApplicationIsUp(APPLICATION_NAME)
+
+ repeatUntilTrue(this.&hasContainerCountExceeded, 20, 1000 * 10, ['arg1': '3']);
+
+ sleep(1000 * 20)
+
+ shell = slider(EXIT_SUCCESS,
+ [
+ ACTION_STATUS,
+ APPLICATION_NAME])
+
+ assertComponentCount(COMMAND_LOGGER, 1, shell)
+ String requested = findLineEntryValue(shell, ["statistics", COMMAND_LOGGER, "containers.requested"] as String[])
+ assert requested != null && requested.isInteger() && requested.toInteger() >= 3,
+ 'At least 2 containers must be requested'
+
+ assert isApplicationInState("RUNNING", APPLICATION_NAME), 'App is not running.'
+
+ assertSuccess(shell)
+ }
+
+
+ boolean hasContainerCountExceeded(Map<String, String> args) {
+ int expectedCount = args['arg1'].toInteger();
+ SliderShell shell = slider(EXIT_SUCCESS,
+ [
+ ACTION_STATUS,
+ APPLICATION_NAME])
+
+ //logShell(shell)
+ String requested = findLineEntryValue(
+ shell, ["statistics", COMMAND_LOGGER, "containers.requested"] as String[])
+ if (requested != null && requested.isInteger() && requested.toInteger() >= expectedCount) {
+ return true
+ }
+
+ return false
+ }
+}