You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ma...@apache.org on 2014/08/13 00:18:20 UTC

git commit: AMBARI-6837. Cancel background tasks for Rebalancer should follow the usual cancel reqeust paradigm.

Repository: ambari
Updated Branches:
  refs/heads/trunk 221f0e511 -> 2aee43d38


AMBARI-6837. Cancel background tasks for Rebalancer should follow the usual cancel reqeust paradigm.


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2aee43d3
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2aee43d3
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2aee43d3

Branch: refs/heads/trunk
Commit: 2aee43d386e36f8c4999409db02078eb36c08e04
Parents: 221f0e5
Author: Mahadev Konar <ma...@apache.org>
Authored: Tue Aug 12 14:47:24 2014 -0700
Committer: Mahadev Konar <ma...@apache.org>
Committed: Tue Aug 12 14:47:24 2014 -0700

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py |  26 +++--
 .../ambari_agent/CustomServiceOrchestrator.py   |  76 ++++++-------
 .../main/python/ambari_agent/PythonExecutor.py  |   2 +-
 .../test/python/ambari_agent/TestActionQueue.py |  73 +++----------
 .../TestCustomServiceOrchestrator.py            |  69 ++++++++++++
 .../ambari/server/actionmanager/Stage.java      | 107 +++++++++----------
 .../system_action_definitions.xml               |  10 --
 .../custom_actions/cancel_background_task.py    |  41 -------
 .../services/HDFS/package/scripts/namenode.py   |  15 ++-
 .../BackgroundCustomCommandExecutionTest.java   |  51 ---------
 ambari-web/app/controllers/main/service/item.js |   2 +-
 ambari-web/app/utils/ajax/ajax.js               |  18 ++--
 ambari-web/app/utils/host_progress_popup.js     |   7 +-
 13 files changed, 203 insertions(+), 294 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 6437036..476955f 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -55,11 +55,9 @@ class ActionQueue(threading.Thread):
   STATUS_COMMAND = 'STATUS_COMMAND'
   EXECUTION_COMMAND = 'EXECUTION_COMMAND'
   BACKGROUND_EXECUTION_COMMAND = 'BACKGROUND_EXECUTION_COMMAND'
-  CANCEL_BACKGROUND_EXECUTION_COMMAND = 'CANCEL_BACKGROUND_EXECUTION_COMMAND'
   ROLE_COMMAND_INSTALL = 'INSTALL'
   ROLE_COMMAND_START = 'START'
   ROLE_COMMAND_STOP = 'STOP'
-  ROLE_COMMAND_CANCEL = 'CANCEL'
   ROLE_COMMAND_CUSTOM_COMMAND = 'CUSTOM_COMMAND'
   CUSTOM_COMMAND_RESTART = 'RESTART'
 
@@ -80,7 +78,7 @@ class ActionQueue(threading.Thread):
     self.configTags = {}
     self._stop = threading.Event()
     self.tmpdir = config.get('agent', 'prefix')
-    self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller, self.commandStatuses)
+    self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller)
 
 
   def stop(self):
@@ -172,7 +170,7 @@ class ActionQueue(threading.Thread):
   def createCommandHandle(self, command):
     if(command.has_key('__handle')):
       raise AgentException("Command already has __handle")
-    command['__handle'] = BackgroundCommandExecutionHandle(command, command['commandId'], self.on_background_command_started, self.on_background_command_complete_callback)
+    command['__handle'] = BackgroundCommandExecutionHandle(command, command['commandId'], None, self.on_background_command_complete_callback)
     return command
 
   def process_command(self, command):
@@ -281,20 +279,26 @@ class ActionQueue(threading.Thread):
 
     self.commandStatuses.put_command_status(command, roleResult)
 
-  def on_background_command_started(self, handle):
-    #update command with given handle
-    self.commandStatuses.update_command_status(handle.command, {'pid' : handle.pid})
-     
-     
+  def command_was_canceled(self):
+    self.customServiceOrchestrator
   def on_background_command_complete_callback(self, process_condenced_result, handle):
     logger.debug('Start callback: %s' % process_condenced_result)
     logger.debug('The handle is: %s' % handle)
     status = self.COMPLETED_STATUS if handle.exitCode == 0 else self.FAILED_STATUS
+    
+    aborted_postfix = self.customServiceOrchestrator.command_canceled_reason(handle.command['taskId'])
+    if aborted_postfix:
+      status = self.FAILED_STATUS
+      logger.debug('Set status to: %s , reason = %s' % (status, aborted_postfix))
+    else:
+      aborted_postfix = ''
+      
+    
     roleResult = self.commandStatuses.generate_report_template(handle.command)
     
     roleResult.update({
-      'stdout': process_condenced_result['stdout'],
-      'stderr': process_condenced_result['stderr'],
+      'stdout': process_condenced_result['stdout'] + aborted_postfix,
+      'stderr': process_condenced_result['stderr'] + aborted_postfix,
       'exitCode': process_condenced_result['exitcode'],
       'structuredOut': str(json.dumps(process_condenced_result['structuredOut'])) if 'structuredOut' in process_condenced_result else '',
       'status': status,

http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 093fc22..94aa87e 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -23,6 +23,7 @@ import os
 import json
 import sys
 import shell
+import threading
 
 from FileCache import FileCache
 from AgentException import AgentException
@@ -50,7 +51,7 @@ class CustomServiceOrchestrator():
   PING_PORTS_KEY = "all_ping_ports"
   AMBARI_SERVER_HOST = "ambari_server_host"
 
-  def __init__(self, config, controller, commandStatuses = None):
+  def __init__(self, config, controller):
     self.config = config
     self.tmp_dir = config.get('agent', 'prefix')
     self.exec_tmp_dir = config.get('agent', 'tmp_dir')
@@ -64,26 +65,31 @@ class CustomServiceOrchestrator():
     # cache reset will be called on every agent registration
     controller.registration_listeners.append(self.file_cache.reset)
     
-    self.commandStatuses = commandStatuses
     # Clean up old status command files if any
     try:
       os.unlink(self.status_commands_stdout)
       os.unlink(self.status_commands_stderr)
     except OSError:
       pass # Ignore fail
+    self.commands_in_progress_lock = threading.RLock()
     self.commands_in_progress = {}
 
   def map_task_to_process(self, task_id, processId):
-    self.commands_in_progress[task_id] = processId
+    with self.commands_in_progress_lock:
+      logger.debug('Maps taskId=%s to pid=%s'%(task_id, processId))
+      self.commands_in_progress[task_id] = processId
 
   def cancel_command(self, task_id, reason):
-    if task_id in self.commands_in_progress.keys():
-      pid = self.commands_in_progress.get(task_id)
-      self.commands_in_progress[task_id] = reason
-      logger.info("Canceling command with task_id - {tid}, " \
-                  "reason - {reason} . Killing process {pid}"
-      .format(tid = str(task_id), reason = reason, pid = pid))
-      shell.kill_process_with_children(pid)
+    with self.commands_in_progress_lock:
+      if task_id in self.commands_in_progress.keys():
+        pid = self.commands_in_progress.get(task_id)
+        self.commands_in_progress[task_id] = reason
+        logger.info("Canceling command with task_id - {tid}, " \
+                    "reason - {reason} . Killing process {pid}"
+        .format(tid = str(task_id), reason = reason, pid = pid))
+        shell.kill_process_with_children(pid)
+      else: 
+        logger.warn("Unable to find pid by taskId = %s"%task_id)
 
   def runCommand(self, command, tmpoutfile, tmperrfile, forced_command_name = None,
                  override_output_files = True):
@@ -95,7 +101,6 @@ class CustomServiceOrchestrator():
       script_type = command['commandParams']['script_type']
       script = command['commandParams']['script']
       timeout = int(command['commandParams']['command_timeout'])
-      before_interceptor_method = command['commandParams']['before_system_hook_function']  if command['commandParams'].has_key('before_system_hook_function') else None
       
       if 'hostLevelParams' in command and 'jdk_location' in command['hostLevelParams']:
         server_url_prefix = command['hostLevelParams']['jdk_location']
@@ -114,12 +119,6 @@ class CustomServiceOrchestrator():
       if command_name == self.CUSTOM_ACTION_COMMAND:
         base_dir = self.file_cache.get_custom_actions_base_dir(server_url_prefix)
         script_tuple = (os.path.join(base_dir, script) , base_dir)
-        
-        # Call systemHook functions in current virtual machine. This function can enrich custom action 
-        # command with some information from current machine. And can be considered as plugin
-        if before_interceptor_method != None: 
-          self.processSystemHookFunctions(script_tuple, before_interceptor_method, command)
-        
         hook_dir = None
       else:
         if command_name == self.CUSTOM_COMMAND_COMMAND:
@@ -140,6 +139,7 @@ class CustomServiceOrchestrator():
       handle = None
       if(command.has_key('__handle')):
         handle = command['__handle']
+        handle.on_background_command_started = self.map_task_to_process
         del command['__handle']
       
       json_path = self.dump_command_to_json(command)
@@ -155,7 +155,6 @@ class CustomServiceOrchestrator():
 
       # Executing hooks and script
       ret = None
-
       from ActionQueue import ActionQueue
       if(command.has_key('commandType') and command['commandType'] == ActionQueue.BACKGROUND_EXECUTION_COMMAND and len(filtered_py_file_list) > 1):
         raise AgentException("Background commands are supported without hooks only")
@@ -174,18 +173,17 @@ class CustomServiceOrchestrator():
       if not ret: # Something went wrong
         raise AgentException("No script has been executed")
 
-      # if canceled
-      if self.commands_in_progress.has_key(task_id):#Background command do not push in this collection (TODO)
-        pid = self.commands_in_progress.pop(task_id)
-        if not isinstance(pid, int):
-          reason = '\nCommand aborted. ' + pid
-          ret['stdout'] += reason
-          ret['stderr'] += reason
+      # if canceled and not background command
+      if handle is None:
+        cancel_reason = self.command_canceled_reason(task_id)
+        if cancel_reason:
+          ret['stdout'] += cancel_reason
+          ret['stderr'] += cancel_reason
   
           with open(tmpoutfile, "a") as f:
-            f.write(reason)
+            f.write(cancel_reason)
           with open(tmperrfile, "a") as f:
-            f.write(reason)
+            f.write(cancel_reason)
 
     except Exception: # We do not want to let agent fail completely
       exc_type, exc_obj, exc_tb = sys.exc_info()
@@ -199,21 +197,15 @@ class CustomServiceOrchestrator():
         'exitcode': 1,
       }
     return ret
-
-  def fetch_bg_pid_by_taskid(self,command):
-    cancel_command_pid = None
-    try:
-      cancelTaskId = int(command['commandParams']['cancel_task_id'])
-      status = self.commandStatuses.get_command_status(cancelTaskId)
-      cancel_command_pid = status['pid']
-    except Exception:
-      pass
-    logger.info("Found PID=%s for cancel taskId=%s" % (cancel_command_pid,cancelTaskId))
-    command['commandParams']['cancel_command_pid'] = cancel_command_pid
-
-  def processSystemHookFunctions(self, script_tuple, before_interceptor_method, command):
-    getattr(self, before_interceptor_method)(command)
-    
+  def command_canceled_reason(self, task_id):
+    with self.commands_in_progress_lock:
+      if self.commands_in_progress.has_key(task_id):#Background command do not push in this collection (TODO)
+        logger.debug('Pop with taskId %s' % task_id)
+        pid = self.commands_in_progress.pop(task_id)
+        if not isinstance(pid, int):
+          return '\nCommand aborted. ' + pid
+    return None
+        
   def requestComponentStatus(self, command):
     """
      Component status is determined by exit code, returned by runCommand().

http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
index d130497..874b70b 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
@@ -196,7 +196,7 @@ class BackgroundThread(threading.Thread):
     
     self.holder.handle.pid = process.pid
     self.holder.handle.status = BackgroundCommandExecutionHandle.RUNNING_STATUS
-    self.holder.handle.on_background_command_started(self.holder.handle)
+    self.holder.handle.on_background_command_started(self.holder.handle.command['taskId'], process.pid)
     
     process.communicate()
     

http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 4447670..f582a68 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -618,12 +618,6 @@ class TestActionQueue(TestCase):
     result = {}
     lock = threading.RLock()
     complete_done = threading.Condition(lock)
-    start_done = threading.Condition(lock)
-    
-    def command_started_w(handle):
-      with lock:
-        result['command_started'] = {'handle': copy.copy(handle), 'command_status' : actionQueue.commandStatuses.get_command_status(handle.command['taskId'])}
-        start_done.notifyAll()
     
     def command_complete_w(process_condenced_result, handle):
       with lock:
@@ -634,26 +628,13 @@ class TestActionQueue(TestCase):
         complete_done.notifyAll()
     
     actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,None, command_complete_w)
-    actionQueue.on_background_command_started = wraped(actionQueue.on_background_command_started,None,command_started_w)
     actionQueue.put([self.background_command])
     actionQueue.processBackgroundQueueSafeEmpty();
     actionQueue.processStatusCommandQueueSafeEmpty();
     
     with lock:
-      start_done.wait(5)
-      
-      self.assertTrue(result.has_key('command_started'), 'command started callback was not fired')
-      started_handle = result['command_started']['handle']
-      started_status = result['command_started']['command_status']
-      
-      self.assertEqual(started_handle.pid, started_status['pid'])
-      self.assertTrue(started_handle.pid > 0, "PID was not assigned to handle")
-      self.assertEqual(started_status['status'], ActionQueue.IN_PROGRESS_STATUS)
-      
-      complete_done.wait(2)
+      complete_done.wait(.1)
       
-      finished_handle = result['command_complete']['handle']
-      self.assertEqual(started_handle.pid, finished_handle.pid)
       finished_status = result['command_complete']['command_status']
       self.assertEqual(finished_status['status'], ActionQueue.COMPLETED_STATUS)
       self.assertEqual(finished_status['stdout'], 'process_out')
@@ -667,46 +648,24 @@ class TestActionQueue(TestCase):
     report = actionQueue.result()
     self.assertEqual(len(report['reports']),1)
     self.assertEqual(report['reports'][0]['stdout'],'process_out')
-#     self.assertEqual(report['reports'][0]['structuredOut'],'{"a": "b."}')
-    
-        
-  @patch.object(StackVersionsFileHandler, "read_stack_version")
-  @patch.object(FileCache, "__init__")
-  def test_cancel_backgound_command(self, read_stack_version_mock, FileCache_mock):
-    FileCache_mock.return_value = None
+#    self.assertEqual(report['reports'][0]['structuredOut'],'{"a": "b."}')
     
-    dummy_controller = MagicMock()
-    cfg = AmbariConfig().getConfig()
-    cfg.set('agent', 'tolerate_download_failures', 'true')
-    cfg.set('agent', 'prefix', '.')
-    cfg.set('agent', 'cache_dir', 'background_tasks')
     
-    actionQueue = ActionQueue(cfg, dummy_controller)
-    patch_output_file(actionQueue.customServiceOrchestrator.python_executor)
-    actionQueue.customServiceOrchestrator.python_executor.prepare_process_result = MagicMock()
-    actionQueue.customServiceOrchestrator.dump_command_to_json = MagicMock()
+  
+  cancel_background_command = {
+    "commandType":"CANCEL_COMMAND",
+    "role":"AMBARI_SERVER_ACTION",
+    "roleCommand":"ABORT",
+    "commandId":"2--1",
+    "taskId":20,
+    "clusterName":"c1",
+    "serviceName":"",
+    "hostname":"c6401",
+    "roleParams":{
+      "cancelTaskIdTargets":"13,14"
+    },
+  }
 
-    lock = threading.RLock()
-    complete_done = threading.Condition(lock)
-    
-    def command_complete_w(process_condenced_result, handle):
-      with lock:
-        complete_done.wait(4)
-    
-    actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,None, command_complete_w)
-    execute_command = copy.deepcopy(self.background_command)
-    actionQueue.put([execute_command])
-    actionQueue.processBackgroundQueueSafeEmpty();
-    
-    time.sleep(1)
-    
-    actionQueue.process_command(self.cancel_background_command)
-    #TODO add assert
-    
-    with lock:
-      complete_done.notifyAll()
-      
-      
 def patch_output_file(pythonExecutor):
   def windows_py(command, tmpout, tmperr):
     proc = MagicMock()

http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
index a1e1c66..92791e2 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
@@ -40,6 +40,7 @@ from AgentException import AgentException
 from FileCache import FileCache
 from LiveStatus import LiveStatus
 from BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
+from ambari_agent.ActionQueue import ActionQueue
 
 
 class TestCustomServiceOrchestrator(TestCase):
@@ -304,6 +305,74 @@ class TestCustomServiceOrchestrator(TestCase):
     self.assertTrue(os.path.exists(err))
     os.remove(out)
     os.remove(err)
+    
+  from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
+    
+  @patch("shell.kill_process_with_children")
+  @patch.object(FileCache, "__init__")
+  @patch.object(CustomServiceOrchestrator, "resolve_script_path")
+  @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path")
+  @patch.object(StackVersionsFileHandler, "read_stack_version")
+  def test_cancel_backgound_command(self, read_stack_version_mock, resolve_hook_script_path_mock, resolve_script_path_mock, FileCache_mock,  
+                                      kill_process_with_children_mock):
+    FileCache_mock.return_value = None
+    FileCache_mock.cache_dir = MagicMock()
+    resolve_hook_script_path_mock.return_value = None
+#     shell.kill_process_with_children = MagicMock()
+    dummy_controller = MagicMock()
+    cfg = AmbariConfig().getConfig()
+    cfg.set('agent', 'tolerate_download_failures', 'true')
+    cfg.set('agent', 'prefix', '.')
+    cfg.set('agent', 'cache_dir', 'background_tasks')
+     
+    actionQueue = ActionQueue(cfg, dummy_controller)
+    
+    dummy_controller.actionQueue = actionQueue
+    orchestrator = CustomServiceOrchestrator(cfg, dummy_controller)
+    orchestrator.file_cache = MagicMock()
+    def f (a, b):
+      return ""
+    orchestrator.file_cache.get_service_base_dir = f
+    actionQueue.customServiceOrchestrator = orchestrator
+    
+    import TestActionQueue
+    import copy
+    
+    TestActionQueue.patch_output_file(orchestrator.python_executor)
+    orchestrator.python_executor.prepare_process_result = MagicMock()
+    orchestrator.dump_command_to_json = MagicMock()
+ 
+    lock = threading.RLock()
+    complete_done = threading.Condition(lock)
+    
+    complete_was_called = {}
+    def command_complete_w(process_condenced_result, handle):
+      with lock:
+        complete_was_called['visited']= ''
+        complete_done.wait(3)
+     
+    actionQueue.on_background_command_complete_callback = TestActionQueue.wraped(actionQueue.on_background_command_complete_callback, command_complete_w, None) 
+    execute_command = copy.deepcopy(TestActionQueue.TestActionQueue.background_command)
+    actionQueue.put([execute_command])
+    actionQueue.processBackgroundQueueSafeEmpty()
+     
+    time.sleep(.1) 
+    
+    orchestrator.cancel_command(19,'')
+    self.assertTrue(kill_process_with_children_mock.called)
+    kill_process_with_children_mock.assert_called_with(33)
+     
+    with lock:
+      complete_done.notifyAll()
+
+    with lock:
+      self.assertTrue(complete_was_called.has_key('visited'))
+    
+    time.sleep(.1)
+     
+    runningCommand = actionQueue.commandStatuses.get_command_status(19)
+    self.assertTrue(runningCommand is not None)
+    self.assertEqual(runningCommand['status'], ActionQueue.FAILED_STATUS)
 
 
   @patch.object(CustomServiceOrchestrator, "dump_command_to_json")

http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
index 83d46eb..c4bbb46 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
 
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
 import org.apache.ambari.server.agent.ExecutionCommand;
 import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
 import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
@@ -38,10 +39,13 @@ import org.apache.ambari.server.orm.entities.StageEntity;
 import org.apache.ambari.server.serveraction.ServerAction;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.ServiceComponentHostEvent;
+import org.apache.ambari.server.state.fsm.event.Event;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
 import org.apache.ambari.server.utils.StageUtils;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
 
 import com.google.inject.assistedinject.Assisted;
 import com.google.inject.assistedinject.AssistedInject;
@@ -216,51 +220,58 @@ public class Stage {
   public String getActionId() {
     return StageUtils.getActionId(requestId, getStageId());
   }
-
-  /**
-   * A new host role command is created for execution.
-   * Creates both ExecutionCommand and HostRoleCommand objects and
-   * adds them to the Stage. This should be called only once for a host-role
-   * for a given stage.
-   */
-  public synchronized void addHostRoleExecutionCommand(String host, Role role,  RoleCommand command,
-      ServiceComponentHostEvent event, String clusterName, String serviceName) {
+  
+  private synchronized ExecutionCommandWrapper addGenericExecutionCommand(String clusterName, String hostName, Role role, RoleCommand command, ServiceComponentHostEvent event){
     //used on stage creation only, no need to check if wrappers loaded
-    HostRoleCommand hrc = new HostRoleCommand(host, role, event, command);
+    HostRoleCommand hrc = new HostRoleCommand(hostName, role, event, command);
     ExecutionCommand cmd = new ExecutionCommand();
     ExecutionCommandWrapper wrapper = new ExecutionCommandWrapper(cmd);
     hrc.setExecutionCommandWrapper(wrapper);
-    cmd.setHostname(host);
+    cmd.setHostname(hostName);
     cmd.setClusterName(clusterName);
-    cmd.setServiceName(serviceName);
     cmd.setCommandId(this.getActionId());
     cmd.setRole(role.name());
     cmd.setRoleCommand(command);
     
-    Map<String, HostRoleCommand> hrcMap = this.hostRoleCommands.get(host);
+    cmd.setServiceName("");
+    
+    Map<String, HostRoleCommand> hrcMap = this.hostRoleCommands.get(hostName);
     if (hrcMap == null) {
       hrcMap = new LinkedHashMap<String, HostRoleCommand>();
-      this.hostRoleCommands.put(host, hrcMap);
+      this.hostRoleCommands.put(hostName, hrcMap);
     }
     if (hrcMap.get(role.toString()) != null) {
       throw new RuntimeException(
           "Setting the host role command second time for same stage: stage="
-              + this.getActionId() + ", host=" + host + ", role=" + role);
+              + this.getActionId() + ", host=" + hostName + ", role=" + role);
     }
     hrcMap.put(role.toString(), hrc);
-    List<ExecutionCommandWrapper> execCmdList = this.commandsToSend.get(host);
+    List<ExecutionCommandWrapper> execCmdList = this.commandsToSend.get(hostName);
     if (execCmdList == null) {
       execCmdList = new ArrayList<ExecutionCommandWrapper>();
-      this.commandsToSend.put(host, execCmdList);
+      this.commandsToSend.put(hostName, execCmdList);
     }
-
+    
     if (execCmdList.contains(wrapper)) {
       //todo: proper exception
       throw new RuntimeException(
           "Setting the execution command second time for same stage: stage="
-              + this.getActionId() + ", host=" + host + ", role=" + role);
+              + this.getActionId() + ", host=" + hostName + ", role=" + role+ ", event="+event);
     }
     execCmdList.add(wrapper);
+    return wrapper;
+  }
+  /**
+   * A new host role command is created for execution.
+   * Creates both ExecutionCommand and HostRoleCommand objects and
+   * adds them to the Stage. This should be called only once for a host-role
+   * for a given stage.
+   */
+  public synchronized void addHostRoleExecutionCommand(String host, Role role,  RoleCommand command,
+      ServiceComponentHostEvent event, String clusterName, String serviceName) {
+    ExecutionCommandWrapper commandWrapper = addGenericExecutionCommand(clusterName, host, role, command, event);
+    
+    commandWrapper.getExecutionCommand().setServiceName(serviceName);
   }
 
 
@@ -268,51 +279,33 @@ public class Stage {
    *  Creates server-side execution command. As of now, it seems to
    *  be used only for server upgrade
    */
-  public synchronized void addServerActionCommand(
-      String actionName, Role role,  RoleCommand command, String clusterName,
+  public synchronized void addServerActionCommand(String actionName, Role role,  RoleCommand command, String clusterName,
       ServiceComponentHostUpgradeEvent event, String hostName) {
-    //used on stage creation only, no need to check if wrappers loaded
-    HostRoleCommand hrc = new HostRoleCommand(hostName, role, event, command);
-    ExecutionCommand cmd = new ExecutionCommand();
-    ExecutionCommandWrapper wrapper = new ExecutionCommandWrapper(cmd);
-    hrc.setExecutionCommandWrapper(wrapper);
-    cmd.setHostname(hostName);
-    cmd.setClusterName(clusterName);
-    cmd.setServiceName("");
-    cmd.setCommandId(this.getActionId());
-    cmd.setRole(role.name());
-    cmd.setRoleCommand(command);
-
+    ExecutionCommandWrapper commandWrapper = addGenericExecutionCommand(clusterName, hostName, role, command, event);
+    ExecutionCommand cmd = commandWrapper.getExecutionCommand();
+    
     Map<String, String> roleParams = new HashMap<String, String>();
     roleParams.put(ServerAction.ACTION_NAME, actionName);
     cmd.setRoleParams(roleParams);
-    Map<String, HostRoleCommand> hrcMap = this.hostRoleCommands.get(hostName);
-    if (hrcMap == null) {
-      hrcMap = new LinkedHashMap<String, HostRoleCommand>();
-      this.hostRoleCommands.put(hostName, hrcMap);
-    }
-    if (hrcMap.get(role.toString()) != null) {
-      throw new RuntimeException(
-          "Setting the server action the second time for same stage: stage="
-              + this.getActionId() + ", action=" + actionName);
-    }
-    hrcMap.put(role.toString(), hrc);
-    List<ExecutionCommandWrapper> execCmdList = this.commandsToSend.get(hostName);
-    if (execCmdList == null) {
-      execCmdList = new ArrayList<ExecutionCommandWrapper>();
-      this.commandsToSend.put(hostName, execCmdList);
-    }
-
-    if (execCmdList.contains(wrapper)) {
-      //todo: proper exception
-      throw new RuntimeException(
-          "Setting the execution command second time for same stage: stage="
-              + this.getActionId() + ", action=" + actionName);
-    }
-    execCmdList.add(wrapper);
   }
 
   /**
+   *  Adds cancel command to stage for given cancelTargets collection of task id's that has to be canceled in Agent layer.
+   */
+  public synchronized void addCancelRequestCommand(List<Long> cancelTargets, String clusterName, String hostName) {
+    ExecutionCommandWrapper commandWrapper = addGenericExecutionCommand(clusterName, hostName, Role.AMBARI_SERVER_ACTION, RoleCommand.ABORT, null);
+    ExecutionCommand cmd = commandWrapper.getExecutionCommand();
+    cmd.setCommandType(AgentCommandType.CANCEL_COMMAND);
+    
+    Assert.notEmpty(cancelTargets, "Provided targets task Id are empty.");
+    
+    Map<String, String> roleParams = new HashMap<String, String>();
+    
+    roleParams.put("cancelTaskIdTargets", StringUtils.join(cancelTargets, ','));
+    cmd.setRoleParams(roleParams);
+  }
+  
+  /**
    *
    * @return list of hosts
    */

http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml b/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml
index b9600dd..c65a496 100644
--- a/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml
+++ b/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml
@@ -49,14 +49,4 @@
     <description>Validate if provided service config can be applied to specified hosts</description>
     <targetType>ALL</targetType>
   </actionDefinition>
-  <actionDefinition>
-    <actionName>cancel_background_task</actionName>
-    <actionType>SYSTEM</actionType>
-    <inputs></inputs>
-    <targetService></targetService>
-    <targetComponent></targetComponent>
-    <defaultTimeout>60</defaultTimeout>
-    <description>Cancel background task</description>
-    <targetType>ANY</targetType>
-  </actionDefinition>
 </actionDefinitions>

http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-server/src/main/resources/custom_actions/cancel_background_task.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/custom_actions/cancel_background_task.py b/ambari-server/src/main/resources/custom_actions/cancel_background_task.py
deleted file mode 100644
index 9f9b1ea..0000000
--- a/ambari-server/src/main/resources/custom_actions/cancel_background_task.py
+++ /dev/null
@@ -1,41 +0,0 @@
-#!/usr/bin/env python
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
-Ambari Agent
-
-"""
-from resource_management import Script
-from ambari_agent import shell
-
-class CancelBackgroundTaskCommand(Script):
-  def actionexecute(self, env):
-    config = Script.get_config()
-
-    cancel_command_pid = config['commandParams']['cancel_command_pid'] if config['commandParams'].has_key('cancel_command_pid') else None
-    cancel_task_id = config['commandParams']['cancel_task_id']
-    if cancel_command_pid == None:
-      print "Nothing to cancel: there is no any task running with given taskId = '%s'" % cancel_task_id
-    else:
-      cancel_policy = config['commandParams']['cancel_policy']
-      print "Send Kill to process pid = %s for task = %s with policy %s" % (cancel_command_pid, cancel_task_id, cancel_policy)
-  
-      shell.kill_process_with_children(cancel_command_pid)
-      print "Process pid = %s for task = %s has been killed successfully" % (cancel_command_pid, cancel_task_id)
-    
-if __name__ == "__main__":
-  CancelBackgroundTaskCommand().execute()

http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/namenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/namenode.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/namenode.py
index 3b320d1..f401122 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/namenode.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/namenode.py
@@ -74,14 +74,15 @@ class NameNode(Script):
     namenode(action="decommission")
     pass
   
+    
   def rebalancehdfs(self, env):
     import params
     env.set_params(params)
 
     name_node_parameters = json.loads( params.name_node_params )
     threshold = name_node_parameters['threshold']
-    print "Starting balancer with threshold = %s" % threshold
-      
+    _print("Starting balancer with threshold = %s\n" % threshold)
+    
     def calculateCompletePercent(first, current):
       return 1.0 - current.bytesLeftToMove/first.bytesLeftToMove
     
@@ -97,7 +98,7 @@ class NameNode(Script):
       basedir = os.path.join(env.config.basedir, 'scripts', 'balancer-emulator')
       command = ['python','hdfs-command.py']
     
-    print "Executing command %s" % command
+    _print("Executing command %s\n" % command)
     
     parser = hdfs_rebalance.HdfsParser()
     proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
@@ -106,7 +107,7 @@ class NameNode(Script):
                           cwd=basedir
                           )
     for line in iter(proc.stdout.readline, ''):
-      sys.stdout.write('[balancer] %s %s' % (str(datetime.now()), line ))
+      _print('[balancer] %s %s' % (str(datetime.now()), line ))
       pl = parser.parseLine(line)
       if pl:
         res = pl.toJson()
@@ -114,10 +115,14 @@ class NameNode(Script):
         
         self.put_structured_out(res)
       elif parser.state == 'PROCESS_FINISED' : 
-        sys.stdout.write('[balancer] %s %s' % (str(datetime.now()), 'Process is finished' ))
+        _print('[balancer] %s %s' % (str(datetime.now()), 'Process is finished' ))
         self.put_structured_out({'completePercent' : 1})
         break
       
       
+def _print(line):
+  sys.stdout.write(line)
+  sys.stdout.flush()
+
 if __name__ == "__main__":
   NameNode().execute()

http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java
index d49cadd..76b9fbc 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/BackgroundCustomCommandExecutionTest.java
@@ -146,57 +146,6 @@ public class BackgroundCustomCommandExecutionTest {
       Assert.fail(e.getMessage());
     }
   }
-  @SuppressWarnings("serial")
-  @Test
-  public void testCancelCommand() {
-    try {
-      createClusterFixture();
-      
-      Map<String, String> requestProperties = new HashMap<String, String>() {
-        {
-          put(REQUEST_CONTEXT_PROPERTY, "Stop background command");
-//          put("cancel_policy","SIGKILL");
-//          put("cancel_task_id","19");
-        }
-      };
-
-      ExecuteActionRequest actionRequest = new ExecuteActionRequest(
-          "c1", 
-          "actionexecute","cancel_background_task",
-          null,
-          null,
-          new HashMap<String, String>(){{
-            put("cancel_policy","SIGKILL"); // parameters/cancel_policy -- in request params
-            put("cancel_task_id","19");
-          }});
-      actionRequest.getResourceFilters().add(new RequestResourceFilter("HDFS", "NAMENODE", Collections.singletonList("c6401")));
-      
-      controller.createAction(actionRequest, requestProperties);
-      
-      Mockito.verify(am, Mockito.times(1)).sendActions(stagesCaptor.capture(), any(ExecuteActionRequest.class));
-      
-      List<Stage> stages = stagesCaptor.getValue();
-      Assert.assertEquals(1, stages.size());
-      Stage stage = stages.get(0);
-      
-      Assert.assertEquals(1, stage.getHosts().size());
-      
-      List<ExecutionCommandWrapper> commands = stage.getExecutionCommands("c6401");
-      Assert.assertEquals(1, commands.size());
-      
-      ExecutionCommand command = commands.get(0).getExecutionCommand();
-      
-      Assert.assertEquals(AgentCommandType.EXECUTION_COMMAND, command.getCommandType());
-      Assert.assertEquals("ACTIONEXECUTE", command.getRoleCommand().name());
-      Assert.assertEquals("cancel_background_task.py", command.getCommandParams().get("script"));
-      Assert.assertEquals("SIGKILL", command.getCommandParams().get("cancel_policy"));
-      Assert.assertEquals("19", command.getCommandParams().get("cancel_task_id"));
-      
-      
-    } catch (AmbariException e) {
-      Assert.fail(e.getMessage());
-    }
-  }
   
   private void createClusterFixture() throws AmbariException {
     createCluster("c1");

http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-web/app/controllers/main/service/item.js
----------------------------------------------------------------------
diff --git a/ambari-web/app/controllers/main/service/item.js b/ambari-web/app/controllers/main/service/item.js
index f05940e..5593c72 100644
--- a/ambari-web/app/controllers/main/service/item.js
+++ b/ambari-web/app/controllers/main/service/item.js
@@ -280,7 +280,7 @@ App.MainServiceItemController = Em.Controller.extend({
       errorMessage: Em.I18n.t('services.service.actions.run.rebalanceHdfsNodes.promptError'),
       isInvalid: function () {
         var intValue = Number(this.get('inputValue'));
-        return isNaN(intValue) || intValue < 1 || intValue > 100;
+        return this.get('inputValue')!=='DEBUG' && (isNaN(intValue) || intValue < 1 || intValue > 100);
       }.property('inputValue'),
       disablePrimary : function() {
         return this.get('isInvalid');

http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-web/app/utils/ajax/ajax.js
----------------------------------------------------------------------
diff --git a/ambari-web/app/utils/ajax/ajax.js b/ambari-web/app/utils/ajax/ajax.js
index 1cb7846..0de25d4 100644
--- a/ambari-web/app/utils/ajax/ajax.js
+++ b/ambari-web/app/utils/ajax/ajax.js
@@ -340,26 +340,20 @@ var urls = {
   },
   
   'cancel.background.operation' : {
-    'real' : '/clusters/{clusterName}/requests',
+    'real' : '/clusters/{clusterName}/requests/{requestId}',
     'mock' : '',
     'format' : function(data) {
       return {
-        type : 'POST',
+        type : 'PUT',
         data : JSON.stringify({
           RequestInfo : {
-            'context' : 'Cancel background operation',
-            'action'  : 'cancel_background_task',
+            'context' : 'Cancel operation',
             "parameters" : {
-              "cancel_policy"   : "SIGKILL",
-              'before_system_hook_function' : 'fetch_bg_pid_by_taskid',
-              "cancel_task_id"  : data.cancelTaskId
+              "cancel_policy"   : "SIGKILL"
             }
           },
-          "Requests/resource_filters" : [ {
-            "service_name" : data.serviceName,
-            "component_name" : data.componentName,
-            'hosts' : data.hosts
-          } ]
+          "Requests/request_status":'ABORTED',
+          "Requests/abort_reason": "Cancel background operation"
         })
       }
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/2aee43d3/ambari-web/app/utils/host_progress_popup.js
----------------------------------------------------------------------
diff --git a/ambari-web/app/utils/host_progress_popup.js b/ambari-web/app/utils/host_progress_popup.js
index bcd3563..0da534d 100644
--- a/ambari-web/app/utils/host_progress_popup.js
+++ b/ambari-web/app/utils/host_progress_popup.js
@@ -958,12 +958,7 @@ App.HostPopup = Em.Object.create({
             name : 'cancel.background.operation',
               sender : hostPopup,
             data : {
-                cancelTaskId : hostPopup.get('openedTaskId'),
-              command : "REFRESHQUEUES",
-              context : Em.I18n.t('services.service.actions.run.yarnRefreshQueues.context') ,
-              hosts : App.Service.find('HDFS').get('hostComponents').findProperty('componentName', 'NAMENODE').get('hostName'),
-              serviceName : "HDFS",
-              componentName : "NAMENODE"
+              requestId : hostPopup.get('controller.currentServiceId')
             }
           });
             hostPopup.backToServiceList();