You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ma...@apache.org on 2014/08/13 00:18:20 UTC
git commit: AMBARI-6837. Cancel background tasks for Rebalancer
should follow the usual cancel reqeust paradigm.
Repository: ambari
Updated Branches:
refs/heads/trunk 221f0e511 -> 2aee43d38
AMBARI-6837. Cancel background tasks for Rebalancer should follow the usual cancel reqeust paradigm.
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2aee43d3
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2aee43d3
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2aee43d3
Branch: refs/heads/trunk
Commit: 2aee43d386e36f8c4999409db02078eb36c08e04
Parents: 221f0e5
Author: Mahadev Konar <ma...@apache.org>
Authored: Tue Aug 12 14:47:24 2014 -0700
Committer: Mahadev Konar <ma...@apache.org>
Committed: Tue Aug 12 14:47:24 2014 -0700
----------------------------------------------------------------------
.../src/main/python/ambari_agent/ActionQueue.py | 26 +++--
.../ambari_agent/CustomServiceOrchestrator.py | 76 ++++++-------
.../main/python/ambari_agent/PythonExecutor.py | 2 +-
.../test/python/ambari_agent/TestActionQueue.py | 73 +++----------
.../TestCustomServiceOrchestrator.py | 69 ++++++++++++
.../ambari/server/actionmanager/Stage.java | 107 +++++++++----------
.../system_action_definitions.xml | 10 --
.../custom_actions/cancel_background_task.py | 41 -------
.../services/HDFS/package/scripts/namenode.py | 15 ++-
.../BackgroundCustomCommandExecutionTest.java | 51 ---------
ambari-web/app/controllers/main/service/item.js | 2 +-
ambari-web/app/utils/ajax/ajax.js | 18 ++--
ambari-web/app/utils/host_progress_popup.js | 7 +-
13 files changed, 203 insertions(+), 294 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 6437036..476955f 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -55,11 +55,9 @@ class ActionQueue(threading.Thread):
STATUS_COMMAND = 'STATUS_COMMAND'
EXECUTION_COMMAND = 'EXECUTION_COMMAND'
BACKGROUND_EXECUTION_COMMAND = 'BACKGROUND_EXECUTION_COMMAND'
- CANCEL_BACKGROUND_EXECUTION_COMMAND = 'CANCEL_BACKGROUND_EXECUTION_COMMAND'
ROLE_COMMAND_INSTALL = 'INSTALL'
ROLE_COMMAND_START = 'START'
ROLE_COMMAND_STOP = 'STOP'
- ROLE_COMMAND_CANCEL = 'CANCEL'
ROLE_COMMAND_CUSTOM_COMMAND = 'CUSTOM_COMMAND'
CUSTOM_COMMAND_RESTART = 'RESTART'
@@ -80,7 +78,7 @@ class ActionQueue(threading.Thread):
self.configTags = {}
self._stop = threading.Event()
self.tmpdir = config.get('agent', 'prefix')
- self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller, self.commandStatuses)
+ self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller)
def stop(self):
@@ -172,7 +170,7 @@ class ActionQueue(threading.Thread):
def createCommandHandle(self, command):
if(command.has_key('__handle')):
raise AgentException("Command already has __handle")
- command['__handle'] = BackgroundCommandExecutionHandle(command, command['commandId'], self.on_background_command_started, self.on_background_command_complete_callback)
+ command['__handle'] = BackgroundCommandExecutionHandle(command, command['commandId'], None, self.on_background_command_complete_callback)
return command
def process_command(self, command):
@@ -281,20 +279,26 @@ class ActionQueue(threading.Thread):
self.commandStatuses.put_command_status(command, roleResult)
- def on_background_command_started(self, handle):
- #update command with given handle
- self.commandStatuses.update_command_status(handle.command, {'pid' : handle.pid})
-
-
+ def command_was_canceled(self):
+ self.customServiceOrchestrator
def on_background_command_complete_callback(self, process_condenced_result, handle):
logger.debug('Start callback: %s' % process_condenced_result)
logger.debug('The handle is: %s' % handle)
status = self.COMPLETED_STATUS if handle.exitCode == 0 else self.FAILED_STATUS
+
+ aborted_postfix = self.customServiceOrchestrator.command_canceled_reason(handle.command['taskId'])
+ if aborted_postfix:
+ status = self.FAILED_STATUS
+ logger.debug('Set status to: %s , reason = %s' % (status, aborted_postfix))
+ else:
+ aborted_postfix = ''
+
+
roleResult = self.commandStatuses.generate_report_template(handle.command)
roleResult.update({
- 'stdout': process_condenced_result['stdout'],
- 'stderr': process_condenced_result['stderr'],
+ 'stdout': process_condenced_result['stdout'] + aborted_postfix,
+ 'stderr': process_condenced_result['stderr'] + aborted_postfix,
'exitCode': process_condenced_result['exitcode'],
'structuredOut': str(json.dumps(process_condenced_result['structuredOut'])) if 'structuredOut' in process_condenced_result else '',
'status': status,
http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 093fc22..94aa87e 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -23,6 +23,7 @@ import os
import json
import sys
import shell
+import threading
from FileCache import FileCache
from AgentException import AgentException
@@ -50,7 +51,7 @@ class CustomServiceOrchestrator():
PING_PORTS_KEY = "all_ping_ports"
AMBARI_SERVER_HOST = "ambari_server_host"
- def __init__(self, config, controller, commandStatuses = None):
+ def __init__(self, config, controller):
self.config = config
self.tmp_dir = config.get('agent', 'prefix')
self.exec_tmp_dir = config.get('agent', 'tmp_dir')
@@ -64,26 +65,31 @@ class CustomServiceOrchestrator():
# cache reset will be called on every agent registration
controller.registration_listeners.append(self.file_cache.reset)
- self.commandStatuses = commandStatuses
# Clean up old status command files if any
try:
os.unlink(self.status_commands_stdout)
os.unlink(self.status_commands_stderr)
except OSError:
pass # Ignore fail
+ self.commands_in_progress_lock = threading.RLock()
self.commands_in_progress = {}
def map_task_to_process(self, task_id, processId):
- self.commands_in_progress[task_id] = processId
+ with self.commands_in_progress_lock:
+ logger.debug('Maps taskId=%s to pid=%s'%(task_id, processId))
+ self.commands_in_progress[task_id] = processId
def cancel_command(self, task_id, reason):
- if task_id in self.commands_in_progress.keys():
- pid = self.commands_in_progress.get(task_id)
- self.commands_in_progress[task_id] = reason
- logger.info("Canceling command with task_id - {tid}, " \
- "reason - {reason} . Killing process {pid}"
- .format(tid = str(task_id), reason = reason, pid = pid))
- shell.kill_process_with_children(pid)
+ with self.commands_in_progress_lock:
+ if task_id in self.commands_in_progress.keys():
+ pid = self.commands_in_progress.get(task_id)
+ self.commands_in_progress[task_id] = reason
+ logger.info("Canceling command with task_id - {tid}, " \
+ "reason - {reason} . Killing process {pid}"
+ .format(tid = str(task_id), reason = reason, pid = pid))
+ shell.kill_process_with_children(pid)
+ else:
+ logger.warn("Unable to find pid by taskId = %s"%task_id)
def runCommand(self, command, tmpoutfile, tmperrfile, forced_command_name = None,
override_output_files = True):
@@ -95,7 +101,6 @@ class CustomServiceOrchestrator():
script_type = command['commandParams']['script_type']
script = command['commandParams']['script']
timeout = int(command['commandParams']['command_timeout'])
- before_interceptor_method = command['commandParams']['before_system_hook_function'] if command['commandParams'].has_key('before_system_hook_function') else None
if 'hostLevelParams' in command and 'jdk_location' in command['hostLevelParams']:
server_url_prefix = command['hostLevelParams']['jdk_location']
@@ -114,12 +119,6 @@ class CustomServiceOrchestrator():
if command_name == self.CUSTOM_ACTION_COMMAND:
base_dir = self.file_cache.get_custom_actions_base_dir(server_url_prefix)
script_tuple = (os.path.join(base_dir, script) , base_dir)
-
- # Call systemHook functions in current virtual machine. This function can enrich custom action
- # command with some information from current machine. And can be considered as plugin
- if before_interceptor_method != None:
- self.processSystemHookFunctions(script_tuple, before_interceptor_method, command)
-
hook_dir = None
else:
if command_name == self.CUSTOM_COMMAND_COMMAND:
@@ -140,6 +139,7 @@ class CustomServiceOrchestrator():
handle = None
if(command.has_key('__handle')):
handle = command['__handle']
+ handle.on_background_command_started = self.map_task_to_process
del command['__handle']
json_path = self.dump_command_to_json(command)
@@ -155,7 +155,6 @@ class CustomServiceOrchestrator():
# Executing hooks and script
ret = None
-
from ActionQueue import ActionQueue
if(command.has_key('commandType') and command['commandType'] == ActionQueue.BACKGROUND_EXECUTION_COMMAND and len(filtered_py_file_list) > 1):
raise AgentException("Background commands are supported without hooks only")
@@ -174,18 +173,17 @@ class CustomServiceOrchestrator():
if not ret: # Something went wrong
raise AgentException("No script has been executed")
- # if canceled
- if self.commands_in_progress.has_key(task_id):#Background command do not push in this collection (TODO)
- pid = self.commands_in_progress.pop(task_id)
- if not isinstance(pid, int):
- reason = '\nCommand aborted. ' + pid
- ret['stdout'] += reason
- ret['stderr'] += reason
+ # if canceled and not background command
+ if handle is None:
+ cancel_reason = self.command_canceled_reason(task_id)
+ if cancel_reason:
+ ret['stdout'] += cancel_reason
+ ret['stderr'] += cancel_reason
with open(tmpoutfile, "a") as f:
- f.write(reason)
+ f.write(cancel_reason)
with open(tmperrfile, "a") as f:
- f.write(reason)
+ f.write(cancel_reason)
except Exception: # We do not want to let agent fail completely
exc_type, exc_obj, exc_tb = sys.exc_info()
@@ -199,21 +197,15 @@ class CustomServiceOrchestrator():
'exitcode': 1,
}
return ret
-
- def fetch_bg_pid_by_taskid(self,command):
- cancel_command_pid = None
- try:
- cancelTaskId = int(command['commandParams']['cancel_task_id'])
- status = self.commandStatuses.get_command_status(cancelTaskId)
- cancel_command_pid = status['pid']
- except Exception:
- pass
- logger.info("Found PID=%s for cancel taskId=%s" % (cancel_command_pid,cancelTaskId))
- command['commandParams']['cancel_command_pid'] = cancel_command_pid
-
- def processSystemHookFunctions(self, script_tuple, before_interceptor_method, command):
- getattr(self, before_interceptor_method)(command)
-
+ def command_canceled_reason(self, task_id):
+ with self.commands_in_progress_lock:
+ if self.commands_in_progress.has_key(task_id):#Background command do not push in this collection (TODO)
+ logger.debug('Pop with taskId %s' % task_id)
+ pid = self.commands_in_progress.pop(task_id)
+ if not isinstance(pid, int):
+ return '\nCommand aborted. ' + pid
+ return None
+
def requestComponentStatus(self, command):
"""
Component status is determined by exit code, returned by runCommand().
http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
index d130497..874b70b 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
@@ -196,7 +196,7 @@ class BackgroundThread(threading.Thread):
self.holder.handle.pid = process.pid
self.holder.handle.status = BackgroundCommandExecutionHandle.RUNNING_STATUS
- self.holder.handle.on_background_command_started(self.holder.handle)
+ self.holder.handle.on_background_command_started(self.holder.handle.command['taskId'], process.pid)
process.communicate()
http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 4447670..f582a68 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -618,12 +618,6 @@ class TestActionQueue(TestCase):
result = {}
lock = threading.RLock()
complete_done = threading.Condition(lock)
- start_done = threading.Condition(lock)
-
- def command_started_w(handle):
- with lock:
- result['command_started'] = {'handle': copy.copy(handle), 'command_status' : actionQueue.commandStatuses.get_command_status(handle.command['taskId'])}
- start_done.notifyAll()
def command_complete_w(process_condenced_result, handle):
with lock:
@@ -634,26 +628,13 @@ class TestActionQueue(TestCase):
complete_done.notifyAll()
actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,None, command_complete_w)
- actionQueue.on_background_command_started = wraped(actionQueue.on_background_command_started,None,command_started_w)
actionQueue.put([self.background_command])
actionQueue.processBackgroundQueueSafeEmpty();
actionQueue.processStatusCommandQueueSafeEmpty();
with lock:
- start_done.wait(5)
-
- self.assertTrue(result.has_key('command_started'), 'command started callback was not fired')
- started_handle = result['command_started']['handle']
- started_status = result['command_started']['command_status']
-
- self.assertEqual(started_handle.pid, started_status['pid'])
- self.assertTrue(started_handle.pid > 0, "PID was not assigned to handle")
- self.assertEqual(started_status['status'], ActionQueue.IN_PROGRESS_STATUS)
-
- complete_done.wait(2)
+ complete_done.wait(.1)
- finished_handle = result['command_complete']['handle']
- self.assertEqual(started_handle.pid, finished_handle.pid)
finished_status = result['command_complete']['command_status']
self.assertEqual(finished_status['status'], ActionQueue.COMPLETED_STATUS)
self.assertEqual(finished_status['stdout'], 'process_out')
@@ -667,46 +648,24 @@ class TestActionQueue(TestCase):
report = actionQueue.result()
self.assertEqual(len(report['reports']),1)
self.assertEqual(report['reports'][0]['stdout'],'process_out')
-# self.assertEqual(report['reports'][0]['structuredOut'],'{"a": "b."}')
-
-
- @patch.object(StackVersionsFileHandler, "read_stack_version")
- @patch.object(FileCache, "__init__")
- def test_cancel_backgound_command(self, read_stack_version_mock, FileCache_mock):
- FileCache_mock.return_value = None
+# self.assertEqual(report['reports'][0]['structuredOut'],'{"a": "b."}')
- dummy_controller = MagicMock()
- cfg = AmbariConfig().getConfig()
- cfg.set('agent', 'tolerate_download_failures', 'true')
- cfg.set('agent', 'prefix', '.')
- cfg.set('agent', 'cache_dir', 'background_tasks')
- actionQueue = ActionQueue(cfg, dummy_controller)
- patch_output_file(actionQueue.customServiceOrchestrator.python_executor)
- actionQueue.customServiceOrchestrator.python_executor.prepare_process_result = MagicMock()
- actionQueue.customServiceOrchestrator.dump_command_to_json = MagicMock()
+
+ cancel_background_command = {
+ "commandType":"CANCEL_COMMAND",
+ "role":"AMBARI_SERVER_ACTION",
+ "roleCommand":"ABORT",
+ "commandId":"2--1",
+ "taskId":20,
+ "clusterName":"c1",
+ "serviceName":"",
+ "hostname":"c6401",
+ "roleParams":{
+ "cancelTaskIdTargets":"13,14"
+ },
+ }
- lock = threading.RLock()
- complete_done = threading.Condition(lock)
-
- def command_complete_w(process_condenced_result, handle):
- with lock:
- complete_done.wait(4)
-
- actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,None, command_complete_w)
- execute_command = copy.deepcopy(self.background_command)
- actionQueue.put([execute_command])
- actionQueue.processBackgroundQueueSafeEmpty();
-
- time.sleep(1)
-
- actionQueue.process_command(self.cancel_background_command)
- #TODO add assert
-
- with lock:
- complete_done.notifyAll()
-
-
def patch_output_file(pythonExecutor):
def windows_py(command, tmpout, tmperr):
proc = MagicMock()
http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
index a1e1c66..92791e2 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
@@ -40,6 +40,7 @@ from AgentException import AgentException
from FileCache import FileCache
from LiveStatus import LiveStatus
from BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
+from ambari_agent.ActionQueue import ActionQueue
class TestCustomServiceOrchestrator(TestCase):
@@ -304,6 +305,74 @@ class TestCustomServiceOrchestrator(TestCase):
self.assertTrue(os.path.exists(err))
os.remove(out)
os.remove(err)
+
+ from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
+
+ @patch("shell.kill_process_with_children")
+ @patch.object(FileCache, "__init__")
+ @patch.object(CustomServiceOrchestrator, "resolve_script_path")
+ @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path")
+ @patch.object(StackVersionsFileHandler, "read_stack_version")
+ def test_cancel_backgound_command(self, read_stack_version_mock, resolve_hook_script_path_mock, resolve_script_path_mock, FileCache_mock,
+ kill_process_with_children_mock):
+ FileCache_mock.return_value = None
+ FileCache_mock.cache_dir = MagicMock()
+ resolve_hook_script_path_mock.return_value = None
+# shell.kill_process_with_children = MagicMock()
+ dummy_controller = MagicMock()
+ cfg = AmbariConfig().getConfig()
+ cfg.set('agent', 'tolerate_download_failures', 'true')
+ cfg.set('agent', 'prefix', '.')
+ cfg.set('agent', 'cache_dir', 'background_tasks')
+
+ actionQueue = ActionQueue(cfg, dummy_controller)
+
+ dummy_controller.actionQueue = actionQueue
+ orchestrator = CustomServiceOrchestrator(cfg, dummy_controller)
+ orchestrator.file_cache = MagicMock()
+ def f (a, b):
+ return ""
+ orchestrator.file_cache.get_service_base_dir = f
+ actionQueue.customServiceOrchestrator = orchestrator
+
+ import TestActionQueue
+ import copy
+
+ TestActionQueue.patch_output_file(orchestrator.python_executor)
+ orchestrator.python_executor.prepare_process_result = MagicMock()
+ orchestrator.dump_command_to_json = MagicMock()
+
+ lock = threading.RLock()
+ complete_done = threading.Condition(lock)
+
+ complete_was_called = {}
+ def command_complete_w(process_condenced_result, handle):
+ with lock:
+ complete_was_called['visited']= ''
+ complete_done.wait(3)
+
+ actionQueue.on_background_command_complete_callback = TestActionQueue.wraped(actionQueue.on_background_command_complete_callback, command_complete_w, None)
+ execute_command = copy.deepcopy(TestActionQueue.TestActionQueue.background_command)
+ actionQueue.put([execute_command])
+ actionQueue.processBackgroundQueueSafeEmpty()
+
+ time.sleep(.1)
+
+ orchestrator.cancel_command(19,'')
+ self.assertTrue(kill_process_with_children_mock.called)
+ kill_process_with_children_mock.assert_called_with(33)
+
+ with lock:
+ complete_done.notifyAll()
+
+ with lock:
+ self.assertTrue(complete_was_called.has_key('visited'))
+
+ time.sleep(.1)
+
+ runningCommand = actionQueue.commandStatuses.get_command_status(19)
+ self.assertTrue(runningCommand is not None)
+ self.assertEqual(runningCommand['status'], ActionQueue.FAILED_STATUS)
@patch.object(CustomServiceOrchestrator, "dump_command_to_json")
http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
index 83d46eb..c4bbb46 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
import org.apache.ambari.server.Role;
import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
import org.apache.ambari.server.agent.ExecutionCommand;
import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
@@ -38,10 +39,13 @@ import org.apache.ambari.server.orm.entities.StageEntity;
import org.apache.ambari.server.serveraction.ServerAction;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.ServiceComponentHostEvent;
+import org.apache.ambari.server.state.fsm.event.Event;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
import org.apache.ambari.server.utils.StageUtils;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
@@ -216,51 +220,58 @@ public class Stage {
public String getActionId() {
return StageUtils.getActionId(requestId, getStageId());
}
-
- /**
- * A new host role command is created for execution.
- * Creates both ExecutionCommand and HostRoleCommand objects and
- * adds them to the Stage. This should be called only once for a host-role
- * for a given stage.
- */
- public synchronized void addHostRoleExecutionCommand(String host, Role role, RoleCommand command,
- ServiceComponentHostEvent event, String clusterName, String serviceName) {
+
+ private synchronized ExecutionCommandWrapper addGenericExecutionCommand(String clusterName, String hostName, Role role, RoleCommand command, ServiceComponentHostEvent event){
//used on stage creation only, no need to check if wrappers loaded
- HostRoleCommand hrc = new HostRoleCommand(host, role, event, command);
+ HostRoleCommand hrc = new HostRoleCommand(hostName, role, event, command);
ExecutionCommand cmd = new ExecutionCommand();
ExecutionCommandWrapper wrapper = new ExecutionCommandWrapper(cmd);
hrc.setExecutionCommandWrapper(wrapper);
- cmd.setHostname(host);
+ cmd.setHostname(hostName);
cmd.setClusterName(clusterName);
- cmd.setServiceName(serviceName);
cmd.setCommandId(this.getActionId());
cmd.setRole(role.name());
cmd.setRoleCommand(command);
- Map<String, HostRoleCommand> hrcMap = this.hostRoleCommands.get(host);
+ cmd.setServiceName("");
+
+ Map<String, HostRoleCommand> hrcMap = this.hostRoleCommands.get(hostName);
if (hrcMap == null) {
hrcMap = new LinkedHashMap<String, HostRoleCommand>();
- this.hostRoleCommands.put(host, hrcMap);
+ this.hostRoleCommands.put(hostName, hrcMap);
}
if (hrcMap.get(role.toString()) != null) {
throw new RuntimeException(
"Setting the host role command second time for same stage: stage="
- + this.getActionId() + ", host=" + host + ", role=" + role);
+ + this.getActionId() + ", host=" + hostName + ", role=" + role);
}
hrcMap.put(role.toString(), hrc);
- List<ExecutionCommandWrapper> execCmdList = this.commandsToSend.get(host);
+ List<ExecutionCommandWrapper> execCmdList = this.commandsToSend.get(hostName);
if (execCmdList == null) {
execCmdList = new ArrayList<ExecutionCommandWrapper>();
- this.commandsToSend.put(host, execCmdList);
+ this.commandsToSend.put(hostName, execCmdList);
}
-
+
if (execCmdList.contains(wrapper)) {
//todo: proper exception
throw new RuntimeException(
"Setting the execution command second time for same stage: stage="
- + this.getActionId() + ", host=" + host + ", role=" + role);
+ + this.getActionId() + ", host=" + hostName + ", role=" + role+ ", event="+event);
}
execCmdList.add(wrapper);
+ return wrapper;
+ }
+ /**
+ * A new host role command is created for execution.
+ * Creates both ExecutionCommand and HostRoleCommand objects and
+ * adds them to the Stage. This should be called only once for a host-role
+ * for a given stage.
+ */
+ public synchronized void addHostRoleExecutionCommand(String host, Role role, RoleCommand command,
+ ServiceComponentHostEvent event, String clusterName, String serviceName) {
+ ExecutionCommandWrapper commandWrapper = addGenericExecutionCommand(clusterName, host, role, command, event);
+
+ commandWrapper.getExecutionCommand().setServiceName(serviceName);
}
@@ -268,51 +279,33 @@ public class Stage {
* Creates server-side execution command. As of now, it seems to
* be used only for server upgrade
*/
- public synchronized void addServerActionCommand(
- String actionName, Role role, RoleCommand command, String clusterName,
+ public synchronized void addServerActionCommand(String actionName, Role role, RoleCommand command, String clusterName,
ServiceComponentHostUpgradeEvent event, String hostName) {
- //used on stage creation only, no need to check if wrappers loaded
- HostRoleCommand hrc = new HostRoleCommand(hostName, role, event, command);
- ExecutionCommand cmd = new ExecutionCommand();
- ExecutionCommandWrapper wrapper = new ExecutionCommandWrapper(cmd);
- hrc.setExecutionCommandWrapper(wrapper);
- cmd.setHostname(hostName);
- cmd.setClusterName(clusterName);
- cmd.setServiceName("");
- cmd.setCommandId(this.getActionId());
- cmd.setRole(role.name());
- cmd.setRoleCommand(command);
-
+ ExecutionCommandWrapper commandWrapper = addGenericExecutionCommand(clusterName, hostName, role, command, event);
+ ExecutionCommand cmd = commandWrapper.getExecutionCommand();
+
Map<String, String> roleParams = new HashMap<String, String>();
roleParams.put(ServerAction.ACTION_NAME, actionName);
cmd.setRoleParams(roleParams);
- Map<String, HostRoleCommand> hrcMap = this.hostRoleCommands.get(hostName);
- if (hrcMap == null) {
- hrcMap = new LinkedHashMap<String, HostRoleCommand>();
- this.hostRoleCommands.put(hostName, hrcMap);
- }
- if (hrcMap.get(role.toString()) != null) {
- throw new RuntimeException(
- "Setting the server action the second time for same stage: stage="
- + this.getActionId() + ", action=" + actionName);
- }
- hrcMap.put(role.toString(), hrc);
- List<ExecutionCommandWrapper> execCmdList = this.commandsToSend.get(hostName);
- if (execCmdList == null) {
- execCmdList = new ArrayList<ExecutionCommandWrapper>();
- this.commandsToSend.put(hostName, execCmdList);
- }
-
- if (execCmdList.contains(wrapper)) {
- //todo: proper exception
- throw new RuntimeException(
- "Setting the execution command second time for same stage: stage="
- + this.getActionId() + ", action=" + actionName);
- }
- execCmdList.add(wrapper);
}
/**
+ * Adds cancel command to stage for given cancelTargets collection of task id's that has to be canceled in Agent layer.
+ */
+ public synchronized void addCancelRequestCommand(List<Long> cancelTargets, String clusterName, String hostName) {
+ ExecutionCommandWrapper commandWrapper = addGenericExecutionCommand(clusterName, hostName, Role.AMBARI_SERVER_ACTION, RoleCommand.ABORT, null);
+ ExecutionCommand cmd = commandWrapper.getExecutionCommand();
+ cmd.setCommandType(AgentCommandType.CANCEL_COMMAND);
+
+ Assert.notEmpty(cancelTargets, "Provided targets task Id are empty.");
+
+ Map<String, String> roleParams = new HashMap<String, String>();
+
+ roleParams.put("cancelTaskIdTargets", StringUtils.join(cancelTargets, ','));
+ cmd.setRoleParams(roleParams);
+ }
+
+ /**
*
* @return list of hosts
*/
http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml b/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml
index b9600dd..c65a496 100644
--- a/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml
+++ b/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml
@@ -49,14 +49,4 @@
<description>Validate if provided service config can be applied to specified hosts</description>
<targetType>ALL</targetType>
</actionDefinition>
- <actionDefinition>
- <actionName>cancel_background_task</actionName>
- <actionType>SYSTEM</actionType>
- <inputs></inputs>
- <targetService></targetService>
- <targetComponent></targetComponent>
- <defaultTimeout>60</defaultTimeout>
- <description>Cancel background task</description>
- <targetType>ANY</targetType>
- </actionDefinition>
</actionDefinitions>
http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-server/src/main/resources/custom_actions/cancel_background_task.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/custom_actions/cancel_background_task.py b/ambari-server/src/main/resources/custom_actions/cancel_background_task.py
deleted file mode 100644
index 9f9b1ea..0000000
--- a/ambari-server/src/main/resources/custom_actions/cancel_background_task.py
+++ /dev/null
@@ -1,41 +0,0 @@
-#!/usr/bin/env python
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
-Ambari Agent
-
-"""
-from resource_management import Script
-from ambari_agent import shell
-
-class CancelBackgroundTaskCommand(Script):
- def actionexecute(self, env):
- config = Script.get_config()
-
- cancel_command_pid = config['commandParams']['cancel_command_pid'] if config['commandParams'].has_key('cancel_command_pid') else None
- cancel_task_id = config['commandParams']['cancel_task_id']
- if cancel_command_pid == None:
- print "Nothing to cancel: there is no any task running with given taskId = '%s'" % cancel_task_id
- else:
- cancel_policy = config['commandParams']['cancel_policy']
- print "Send Kill to process pid = %s for task = %s with policy %s" % (cancel_command_pid, cancel_task_id, cancel_policy)
-
- shell.kill_process_with_children(cancel_command_pid)
- print "Process pid = %s for task = %s has been killed successfully" % (cancel_command_pid, cancel_task_id)
-
-if __name__ == "__main__":
- CancelBackgroundTaskCommand().execute()
http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/namenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/namenode.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/namenode.py
index 3b320d1..f401122 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/namenode.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/namenode.py
@@ -74,14 +74,15 @@ class NameNode(Script):
namenode(action="decommission")
pass
+
def rebalancehdfs(self, env):
import params
env.set_params(params)
name_node_parameters = json.loads( params.name_node_params )
threshold = name_node_parameters['threshold']
- print "Starting balancer with threshold = %s" % threshold
-
+ _print("Starting balancer with threshold = %s\n" % threshold)
+
def calculateCompletePercent(first, current):
return 1.0 - current.bytesLeftToMove/first.bytesLeftToMove
@@ -97,7 +98,7 @@ class NameNode(Script):
basedir = os.path.join(env.config.basedir, 'scripts', 'balancer-emulator')
command = ['python','hdfs-command.py']
- print "Executing command %s" % command
+ _print("Executing command %s\n" % command)
parser = hdfs_rebalance.HdfsParser()
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
@@ -106,7 +107,7 @@ class NameNode(Script):
cwd=basedir
)
for line in iter(proc.stdout.readline, ''):
- sys.stdout.write('[balancer] %s %s' % (str(datetime.now()), line ))
+ _print('[balancer] %s %s' % (str(datetime.now()), line ))
pl = parser.parseLine(line)
if pl:
res = pl.toJson()
@@ -114,10 +115,14 @@ class NameNode(Script):
self.put_structured_out(res)
elif parser.state == 'PROCESS_FINISED' :
- sys.stdout.write('[balancer] %s %s' % (str(datetime.now()), 'Process is finished' ))
+ _print('[balancer] %s %s' % (str(datetime.now()), 'Process is finished' ))
self.put_structured_out({'completePercent' : 1})
break
+def _print(line):
+ sys.stdout.write(line)
+ sys.stdout.flush()
+
if __name__ == "__main__":
NameNode().execute()
http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java
index d49cadd..76b9fbc 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java
@@ -146,57 +146,6 @@ public class BackgroundCustomCommandExecutionTest {
Assert.fail(e.getMessage());
}
}
- @SuppressWarnings("serial")
- @Test
- public void testCancelCommand() {
- try {
- createClusterFixture();
-
- Map<String, String> requestProperties = new HashMap<String, String>() {
- {
- put(REQUEST_CONTEXT_PROPERTY, "Stop background command");
-// put("cancel_policy","SIGKILL");
-// put("cancel_task_id","19");
- }
- };
-
- ExecuteActionRequest actionRequest = new ExecuteActionRequest(
- "c1",
- "actionexecute","cancel_background_task",
- null,
- null,
- new HashMap<String, String>(){{
- put("cancel_policy","SIGKILL"); // parameters/cancel_policy -- in request params
- put("cancel_task_id","19");
- }});
- actionRequest.getResourceFilters().add(new RequestResourceFilter("HDFS", "NAMENODE", Collections.singletonList("c6401")));
-
- controller.createAction(actionRequest, requestProperties);
-
- Mockito.verify(am, Mockito.times(1)).sendActions(stagesCaptor.capture(), any(ExecuteActionRequest.class));
-
- List<Stage> stages = stagesCaptor.getValue();
- Assert.assertEquals(1, stages.size());
- Stage stage = stages.get(0);
-
- Assert.assertEquals(1, stage.getHosts().size());
-
- List<ExecutionCommandWrapper> commands = stage.getExecutionCommands("c6401");
- Assert.assertEquals(1, commands.size());
-
- ExecutionCommand command = commands.get(0).getExecutionCommand();
-
- Assert.assertEquals(AgentCommandType.EXECUTION_COMMAND, command.getCommandType());
- Assert.assertEquals("ACTIONEXECUTE", command.getRoleCommand().name());
- Assert.assertEquals("cancel_background_task.py", command.getCommandParams().get("script"));
- Assert.assertEquals("SIGKILL", command.getCommandParams().get("cancel_policy"));
- Assert.assertEquals("19", command.getCommandParams().get("cancel_task_id"));
-
-
- } catch (AmbariException e) {
- Assert.fail(e.getMessage());
- }
- }
private void createClusterFixture() throws AmbariException {
createCluster("c1");
http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-web/app/controllers/main/service/item.js
----------------------------------------------------------------------
diff --git a/ambari-web/app/controllers/main/service/item.js b/ambari-web/app/controllers/main/service/item.js
index f05940e..5593c72 100644
--- a/ambari-web/app/controllers/main/service/item.js
+++ b/ambari-web/app/controllers/main/service/item.js
@@ -280,7 +280,7 @@ App.MainServiceItemController = Em.Controller.extend({
errorMessage: Em.I18n.t('services.service.actions.run.rebalanceHdfsNodes.promptError'),
isInvalid: function () {
var intValue = Number(this.get('inputValue'));
- return isNaN(intValue) || intValue < 1 || intValue > 100;
+ return this.get('inputValue')!=='DEBUG' && (isNaN(intValue) || intValue < 1 || intValue > 100);
}.property('inputValue'),
disablePrimary : function() {
return this.get('isInvalid');
http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-web/app/utils/ajax/ajax.js
----------------------------------------------------------------------
diff --git a/ambari-web/app/utils/ajax/ajax.js b/ambari-web/app/utils/ajax/ajax.js
index 1cb7846..0de25d4 100644
--- a/ambari-web/app/utils/ajax/ajax.js
+++ b/ambari-web/app/utils/ajax/ajax.js
@@ -340,26 +340,20 @@ var urls = {
},
'cancel.background.operation' : {
- 'real' : '/clusters/{clusterName}/requests',
+ 'real' : '/clusters/{clusterName}/requests/{requestId}',
'mock' : '',
'format' : function(data) {
return {
- type : 'POST',
+ type : 'PUT',
data : JSON.stringify({
RequestInfo : {
- 'context' : 'Cancel background operation',
- 'action' : 'cancel_background_task',
+ 'context' : 'Cancel operation',
"parameters" : {
- "cancel_policy" : "SIGKILL",
- 'before_system_hook_function' : 'fetch_bg_pid_by_taskid',
- "cancel_task_id" : data.cancelTaskId
+ "cancel_policy" : "SIGKILL"
}
},
- "Requests/resource_filters" : [ {
- "service_name" : data.serviceName,
- "component_name" : data.componentName,
- 'hosts' : data.hosts
- } ]
+ "Requests/request_status":'ABORTED',
+ "Requests/abort_reason": "Cancel background operation"
})
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-web/app/utils/host_progress_popup.js
----------------------------------------------------------------------
diff --git a/ambari-web/app/utils/host_progress_popup.js b/ambari-web/app/utils/host_progress_popup.js
index bcd3563..0da534d 100644
--- a/ambari-web/app/utils/host_progress_popup.js
+++ b/ambari-web/app/utils/host_progress_popup.js
@@ -958,12 +958,7 @@ App.HostPopup = Em.Object.create({
name : 'cancel.background.operation',
sender : hostPopup,
data : {
- cancelTaskId : hostPopup.get('openedTaskId'),
- command : "REFRESHQUEUES",
- context : Em.I18n.t('services.service.actions.run.yarnRefreshQueues.context') ,
- hosts : App.Service.find('HDFS').get('hostComponents').findProperty('componentName', 'NAMENODE').get('hostName'),
- serviceName : "HDFS",
- componentName : "NAMENODE"
+ requestId : hostPopup.get('controller.currentServiceId')
}
});
hostPopup.backToServiceList();