You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ma...@apache.org on 2014/08/08 19:10:41 UTC

[4/4] git commit: AMBARI-5934. Provide ability to rebalance HDFS.

AMBARI-5934. Provide ability to rebalance HDFS.


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/cb662f49
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/cb662f49
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/cb662f49

Branch: refs/heads/trunk
Commit: cb662f494f27b0ffc2058151f550b0cb08572db1
Parents: 70588f5
Author: Mahadev Konar <ma...@apache.org>
Authored: Fri Aug 8 10:10:17 2014 -0700
Committer: Mahadev Konar <ma...@apache.org>
Committed: Fri Aug 8 10:10:21 2014 -0700

----------------------------------------------------------------------
 .gitignore                                      |    1 +
 .../src/main/python/ambari_agent/ActionQueue.py |   84 +-
 .../BackgroundCommandExecutionHandle.py         |   44 +
 .../python/ambari_agent/CommandStatusDict.py    |   23 +-
 .../ambari_agent/CustomServiceOrchestrator.py   |   59 +-
 .../main/python/ambari_agent/PythonExecutor.py  |  128 ++-
 .../test/python/ambari_agent/TestActionQueue.py |  215 +++-
 .../TestCustomServiceOrchestrator.py            |   34 +
 ambari-server/pom.xml                           |    2 +
 .../server/actionmanager/ActionScheduler.java   |   66 +-
 .../ambari/server/agent/AgentCommand.java       |    1 +
 .../ambari/server/agent/HeartBeatHandler.java   |    1 +
 .../server/api/services/AmbariMetaInfo.java     |   23 +-
 .../server/api/util/StackExtensionHelper.java   |   14 +-
 .../AmbariCustomCommandExecutionHelper.java     |   14 +-
 .../ambari/server/state/ComponentInfo.java      |    8 +
 .../ambari/server/state/ConfigHelper.java       |   11 +-
 .../server/state/CustomCommandDefinition.java   |    7 +-
 .../apache/ambari/server/state/ServiceInfo.java |   14 +-
 .../system_action_definitions.xml               |   20 +-
 .../custom_actions/ambari_hdfs_rebalancer.py    |   59 -
 .../custom_actions/cancel_background_task.py    |   41 +
 .../stacks/HDP/2.0.6/services/HDFS/metainfo.xml |    8 +
 .../scripts/balancer-emulator/balancer-err.log  | 1032 ++++++++++++++++++
 .../scripts/balancer-emulator/balancer.log      |   29 +
 .../scripts/balancer-emulator/hdfs-command.py   |   45 +
 .../HDFS/package/scripts/hdfs_rebalance.py      |  130 +++
 .../services/HDFS/package/scripts/namenode.py   |   52 +
 .../services/HDFS/package/scripts/params.py     |    1 +
 .../actionmanager/TestActionScheduler.java      |   83 ++
 .../server/api/services/AmbariMetaInfoTest.java |   11 +-
 .../AmbariManagementControllerTest.java         |    3 +-
 .../BackgroundCustomCommandExecutionTest.java   |  275 +++++
 .../ActionDefinitionManagerTest.java            |   18 +-
 .../python/stacks/2.0.6/HDFS/test_namenode.py   |    7 +
 .../2.0.6/configs/rebalancehdfs_default.json    |  388 +++++++
 .../cust_action_definitions1.xml                |   10 -
 .../system_action_definitions.xml               |   32 -
 .../stacks/HDP/2.0.5/services/HDFS/metainfo.xml |    9 +
 .../global/background_operations_controller.js  |    6 +
 ambari-web/app/controllers/main/service/item.js |   75 +-
 ambari-web/app/messages.js                      |    8 +
 .../templates/common/host_progress_popup.hbs    |   19 +-
 .../app/templates/common/prompt_popup.hbs       |    5 +-
 ambari-web/app/utils/ajax/ajax.js               |   50 +
 ambari-web/app/utils/helper.js                  |    3 +
 ambari-web/app/utils/host_progress_popup.js     |   37 +-
 ambari-web/app/views/common/modal_popup.js      |    2 +
 ambari-web/app/views/main/service/item.js       |   11 +-
 49 files changed, 2942 insertions(+), 276 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 0430303..cff27da 100644
--- a/.gitignore
+++ b/.gitignore
@@ -20,3 +20,4 @@ derby.log
 pass.txt
 ambari-agent/src/test/python/ambari_agent/dummy_files/current-stack
 velocity.log*
+*.pydevproject

http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index d3aad6e..6437036 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -26,11 +26,13 @@ import pprint
 import os
 import json
 
+from AgentException import AgentException
 from LiveStatus import LiveStatus
 from shell import shellRunner
 from ActualConfigHandler import ActualConfigHandler
 from CommandStatusDict import CommandStatusDict
 from CustomServiceOrchestrator import CustomServiceOrchestrator
+from ambari_agent.BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
 
 
 logger = logging.getLogger()
@@ -52,9 +54,12 @@ class ActionQueue(threading.Thread):
 
   STATUS_COMMAND = 'STATUS_COMMAND'
   EXECUTION_COMMAND = 'EXECUTION_COMMAND'
+  BACKGROUND_EXECUTION_COMMAND = 'BACKGROUND_EXECUTION_COMMAND'
+  CANCEL_BACKGROUND_EXECUTION_COMMAND = 'CANCEL_BACKGROUND_EXECUTION_COMMAND'
   ROLE_COMMAND_INSTALL = 'INSTALL'
   ROLE_COMMAND_START = 'START'
   ROLE_COMMAND_STOP = 'STOP'
+  ROLE_COMMAND_CANCEL = 'CANCEL'
   ROLE_COMMAND_CUSTOM_COMMAND = 'CUSTOM_COMMAND'
   CUSTOM_COMMAND_RESTART = 'RESTART'
 
@@ -66,6 +71,7 @@ class ActionQueue(threading.Thread):
     super(ActionQueue, self).__init__()
     self.commandQueue = Queue.Queue()
     self.statusCommandQueue = Queue.Queue()
+    self.backgroundCommandQueue = Queue.Queue()
     self.commandStatuses = CommandStatusDict(callback_action =
       self.status_update_callback)
     self.config = config
@@ -74,8 +80,7 @@ class ActionQueue(threading.Thread):
     self.configTags = {}
     self._stop = threading.Event()
     self.tmpdir = config.get('agent', 'prefix')
-    self.customServiceOrchestrator = CustomServiceOrchestrator(config,
-                                                               controller)
+    self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller, self.commandStatuses)
 
 
   def stop(self):
@@ -106,7 +111,10 @@ class ActionQueue(threading.Thread):
                   command['serviceName'] + " of cluster " + \
                   command['clusterName'] + " to the queue.")
       logger.debug(pprint.pformat(command))
-      self.commandQueue.put(command)
+      if command['commandType'] == self.BACKGROUND_EXECUTION_COMMAND :
+        self.backgroundCommandQueue.put(self.createCommandHandle(command))
+      else:
+        self.commandQueue.put(command)
 
   def cancel(self, commands):
     for command in commands:
@@ -136,25 +144,45 @@ class ActionQueue(threading.Thread):
 
   def run(self):
     while not self.stopped():
-      while  not self.statusCommandQueue.empty():
-        try:
-          command = self.statusCommandQueue.get(False)
-          self.process_command(command)
-        except (Queue.Empty):
-          pass
+      self.processBackgroundQueueSafeEmpty();
+      self.processStatusCommandQueueSafeEmpty();
       try:
         command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
         self.process_command(command)
       except (Queue.Empty):
         pass
+  def processBackgroundQueueSafeEmpty(self):
+    while not self.backgroundCommandQueue.empty():
+      try:
+        command = self.backgroundCommandQueue.get(False)
+        if(command.has_key('__handle') and command['__handle'].status == None): 
+          self.process_command(command)
+      except (Queue.Empty):
+        pass
+  
+  def processStatusCommandQueueSafeEmpty(self):
+    while not self.statusCommandQueue.empty():
+      try:
+        command = self.statusCommandQueue.get(False)
+        self.process_command(command)
+      except (Queue.Empty):
+        pass
+
+
+  def createCommandHandle(self, command):
+    if(command.has_key('__handle')):
+      raise AgentException("Command already has __handle")
+    command['__handle'] = BackgroundCommandExecutionHandle(command, command['commandId'], self.on_background_command_started, self.on_background_command_complete_callback)
+    return command
 
   def process_command(self, command):
     logger.debug("Took an element of Queue: " + pprint.pformat(command))
     # make sure we log failures
+    commandType = command['commandType']
     try:
-      if command['commandType'] == self.EXECUTION_COMMAND:
+      if commandType in [self.EXECUTION_COMMAND, self.BACKGROUND_EXECUTION_COMMAND]:
         self.execute_command(command)
-      elif command['commandType'] == self.STATUS_COMMAND:
+      elif commandType == self.STATUS_COMMAND:
         self.execute_status_command(command)
       else:
         logger.error("Unrecognized command " + pprint.pformat(command))
@@ -165,11 +193,11 @@ class ActionQueue(threading.Thread):
 
   def execute_command(self, command):
     '''
-    Executes commands of type  EXECUTION_COMMAND
+    Executes commands of type EXECUTION_COMMAND
     '''
     clusterName = command['clusterName']
     commandId = command['commandId']
-
+    isCommandBackground = command['commandType'] == self.BACKGROUND_EXECUTION_COMMAND
     message = "Executing command with id = {commandId} for role = {role} of " \
               "cluster {cluster}.".format(
               commandId = str(commandId), role=command['role'],
@@ -189,13 +217,17 @@ class ActionQueue(threading.Thread):
       'status': self.IN_PROGRESS_STATUS
     })
     self.commandStatuses.put_command_status(command, in_progress_status)
+    
     # running command
     commandresult = self.customServiceOrchestrator.runCommand(command,
       in_progress_status['tmpout'], in_progress_status['tmperr'])
+   
+    
     # dumping results
-    status = self.COMPLETED_STATUS
-    if commandresult['exitcode'] != 0:
-      status = self.FAILED_STATUS
+    if isCommandBackground:
+      return
+    else:
+      status = self.COMPLETED_STATUS if commandresult['exitcode'] == 0 else self.FAILED_STATUS  
     roleResult = self.commandStatuses.generate_report_template(command)
     roleResult.update({
       'stdout': commandresult['stdout'],
@@ -249,6 +281,26 @@ class ActionQueue(threading.Thread):
 
     self.commandStatuses.put_command_status(command, roleResult)
 
+  def on_background_command_started(self, handle):
+    #update command with given handle
+    self.commandStatuses.update_command_status(handle.command, {'pid' : handle.pid})
+     
+     
+  def on_background_command_complete_callback(self, process_condenced_result, handle):
+    logger.debug('Start callback: %s' % process_condenced_result)
+    logger.debug('The handle is: %s' % handle)
+    status = self.COMPLETED_STATUS if handle.exitCode == 0 else self.FAILED_STATUS
+    roleResult = self.commandStatuses.generate_report_template(handle.command)
+    
+    roleResult.update({
+      'stdout': process_condenced_result['stdout'],
+      'stderr': process_condenced_result['stderr'],
+      'exitCode': process_condenced_result['exitcode'],
+      'structuredOut': str(json.dumps(process_condenced_result['structuredOut'])) if 'structuredOut' in process_condenced_result else '',
+      'status': status,
+    })
+    
+    self.commandStatuses.put_command_status(handle.command, roleResult)
 
   def execute_status_command(self, command):
     '''

http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-agent/src/main/python/ambari_agent/BackgroundCommandExecutionHandle.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/BackgroundCommandExecutionHandle.py b/ambari-agent/src/main/python/ambari_agent/BackgroundCommandExecutionHandle.py
new file mode 100644
index 0000000..17b7ce5
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/BackgroundCommandExecutionHandle.py
@@ -0,0 +1,44 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+import logging
+
+logger = logging.getLogger()
+installScriptHash = -1
+
+class BackgroundCommandExecutionHandle:
+  
+  SCHEDULED_STATUS = 'SCHEDULED'
+  RUNNING_STATUS = 'RUNNING'
+  STOP_REQUEST_STATUS = 'STOP_REQUEST'
+  STOPPED_STATUS = 'SCHEDULED'
+  
+  def __init__(self, command, commandId, on_background_command_started, on_background_command_complete_callback):
+    self.command = command
+    self.pid = 0
+    self.status = None
+    self.exitCode = None
+    self.commandId = commandId
+    self.on_background_command_started = on_background_command_started
+    self.on_background_command_complete_callback = on_background_command_complete_callback
+
+
+
+  def __str__(self):
+    return "[BackgroundHandle: pid='{0}', status='{1}', exitCode='{2}', commandId='{3}']".format(self.pid, self.status, self.exitCode, self.commandId)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index f00ada2..0ebc45e 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -21,6 +21,7 @@ limitations under the License.
 import json
 import logging
 import threading
+import copy
 from Grep import Grep
 
 logger = logging.getLogger()
@@ -58,7 +59,25 @@ class CommandStatusDict():
     if not status_command:
       self.callback_action()
 
-
+  def update_command_status(self, command, delta):
+    """
+    Updates status of command without replacing (overwrites with delta value)
+    """
+    if 'taskId' in command:
+      key = command['taskId']
+      status_command = False
+    else: # Status command reports has no task id
+      key = id(command)
+      status_command = True
+    with self.lock: # Synchronized
+      self.current_state[key][1].update(delta)
+    if not status_command:
+      self.callback_action()
+  
+  def get_command_status(self, taskId):
+    with self.lock:
+      c = copy.copy(self.current_state[taskId][1])
+    return c
   def generate_report(self):
     """
     Generates status reports about commands that are IN_PROGRESS, COMPLETE or
@@ -72,7 +91,7 @@ class CommandStatusDict():
       for key, item in self.current_state.items():
         command = item[0]
         report = item[1]
-        if command ['commandType'] == ActionQueue.EXECUTION_COMMAND:
+        if command ['commandType'] in [ActionQueue.EXECUTION_COMMAND, ActionQueue.BACKGROUND_EXECUTION_COMMAND]:
           if (report['status']) != ActionQueue.IN_PROGRESS_STATUS:
             resultReports.append(report)
             # Removing complete/failed command status from dict

http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index e13e543..093fc22 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -50,7 +50,7 @@ class CustomServiceOrchestrator():
   PING_PORTS_KEY = "all_ping_ports"
   AMBARI_SERVER_HOST = "ambari_server_host"
 
-  def __init__(self, config, controller):
+  def __init__(self, config, controller, commandStatuses = None):
     self.config = config
     self.tmp_dir = config.get('agent', 'prefix')
     self.exec_tmp_dir = config.get('agent', 'tmp_dir')
@@ -63,6 +63,8 @@ class CustomServiceOrchestrator():
     self.public_fqdn = hostname.public_hostname(config)
     # cache reset will be called on every agent registration
     controller.registration_listeners.append(self.file_cache.reset)
+    
+    self.commandStatuses = commandStatuses
     # Clean up old status command files if any
     try:
       os.unlink(self.status_commands_stdout)
@@ -93,6 +95,8 @@ class CustomServiceOrchestrator():
       script_type = command['commandParams']['script_type']
       script = command['commandParams']['script']
       timeout = int(command['commandParams']['command_timeout'])
+      before_interceptor_method = command['commandParams']['before_system_hook_function']  if command['commandParams'].has_key('before_system_hook_function') else None
+      
       if 'hostLevelParams' in command and 'jdk_location' in command['hostLevelParams']:
         server_url_prefix = command['hostLevelParams']['jdk_location']
       else:
@@ -110,6 +114,12 @@ class CustomServiceOrchestrator():
       if command_name == self.CUSTOM_ACTION_COMMAND:
         base_dir = self.file_cache.get_custom_actions_base_dir(server_url_prefix)
         script_tuple = (os.path.join(base_dir, script) , base_dir)
+        
+        # Call systemHook functions in current virtual machine. This function can enrich custom action 
+        # command with some information from current machine. And can be considered as plugin
+        if before_interceptor_method != None: 
+          self.processSystemHookFunctions(script_tuple, before_interceptor_method, command)
+        
         hook_dir = None
       else:
         if command_name == self.CUSTOM_COMMAND_COMMAND:
@@ -127,6 +137,11 @@ class CustomServiceOrchestrator():
         message = "Unknown script type {0}".format(script_type)
         raise AgentException(message)
       # Execute command using proper interpreter
+      handle = None
+      if(command.has_key('__handle')):
+        handle = command['__handle']
+        del command['__handle']
+      
       json_path = self.dump_command_to_json(command)
       pre_hook_tuple = self.resolve_hook_script_path(hook_dir,
           self.PRE_HOOK_PREFIX, command_name, script_type)
@@ -141,12 +156,16 @@ class CustomServiceOrchestrator():
       # Executing hooks and script
       ret = None
 
+      from ActionQueue import ActionQueue
+      if(command.has_key('commandType') and command['commandType'] == ActionQueue.BACKGROUND_EXECUTION_COMMAND and len(filtered_py_file_list) > 1):
+        raise AgentException("Background commands are supported without hooks only")
+
       for py_file, current_base_dir in filtered_py_file_list:
         script_params = [command_name, json_path, current_base_dir]
         ret = self.python_executor.run_file(py_file, script_params,
                                self.exec_tmp_dir, tmpoutfile, tmperrfile, timeout,
                                tmpstrucoutfile, logger_level, self.map_task_to_process,
-                               task_id, override_output_files)
+                               task_id, override_output_files, handle = handle)
         # Next run_file() invocations should always append to current output
         override_output_files = False
         if ret['exitcode'] != 0:
@@ -156,16 +175,17 @@ class CustomServiceOrchestrator():
         raise AgentException("No script has been executed")
 
       # if canceled
-      pid = self.commands_in_progress.pop(task_id)
-      if not isinstance(pid, int):
-        reason = '\nCommand aborted. ' + pid
-        ret['stdout'] += reason
-        ret['stderr'] += reason
-
-        with open(tmpoutfile, "a") as f:
-          f.write(reason)
-        with open(tmperrfile, "a") as f:
-          f.write(reason)
+      if self.commands_in_progress.has_key(task_id):#Background command do not push in this collection (TODO)
+        pid = self.commands_in_progress.pop(task_id)
+        if not isinstance(pid, int):
+          reason = '\nCommand aborted. ' + pid
+          ret['stdout'] += reason
+          ret['stderr'] += reason
+  
+          with open(tmpoutfile, "a") as f:
+            f.write(reason)
+          with open(tmperrfile, "a") as f:
+            f.write(reason)
 
     except Exception: # We do not want to let agent fail completely
       exc_type, exc_obj, exc_tb = sys.exc_info()
@@ -180,7 +200,20 @@ class CustomServiceOrchestrator():
       }
     return ret
 
-
+  def fetch_bg_pid_by_taskid(self,command):
+    cancel_command_pid = None
+    try:
+      cancelTaskId = int(command['commandParams']['cancel_task_id'])
+      status = self.commandStatuses.get_command_status(cancelTaskId)
+      cancel_command_pid = status['pid']
+    except Exception:
+      pass
+    logger.info("Found PID=%s for cancel taskId=%s" % (cancel_command_pid,cancelTaskId))
+    command['commandParams']['cancel_command_pid'] = cancel_command_pid
+
+  def processSystemHookFunctions(self, script_tuple, before_interceptor_method, command):
+    getattr(self, before_interceptor_method)(command)
+    
   def requestComponentStatus(self, command):
     """
      Component status is determined by exit code, returned by runCommand().

http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
index 704e8f3..d130497 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
@@ -24,6 +24,9 @@ import subprocess
 import pprint
 import threading
 from threading import Thread
+import time
+from BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle 
+
 from Grep import Grep
 import shell, sys
 
@@ -36,7 +39,6 @@ class PythonExecutor:
   Warning: class maintains internal state. As a result, instances should not be
   used as a singleton for a concurrent execution of python scripts
   """
-
   NO_ERROR = "none"
   grep = Grep()
   event = threading.Event()
@@ -47,9 +49,19 @@ class PythonExecutor:
     self.config = config
     pass
 
+
+  def open_subporcess_files(self, tmpoutfile, tmperrfile, override_output_files):
+    if override_output_files: # Recreate files
+      tmpout =  open(tmpoutfile, 'w')
+      tmperr =  open(tmperrfile, 'w')
+    else: # Append to files
+      tmpout =  open(tmpoutfile, 'a')
+      tmperr =  open(tmperrfile, 'a')
+    return tmpout, tmperr
+    
   def run_file(self, script, script_params, tmp_dir, tmpoutfile, tmperrfile,
                timeout, tmpstructedoutfile, logger_level, callback, task_id,
-               override_output_files = True):
+               override_output_files = True, handle = None):
     """
     Executes the specified python file in a separate subprocess.
     Method returns only when the subprocess is finished.
@@ -59,13 +71,6 @@ class PythonExecutor:
     override_output_files option defines whether stdout/stderr files will be
     recreated or appended
     """
-    if override_output_files: # Recreate files
-      tmpout =  open(tmpoutfile, 'w')
-      tmperr =  open(tmperrfile, 'w')
-    else: # Append to files
-      tmpout =  open(tmpoutfile, 'a')
-      tmperr =  open(tmperrfile, 'a')
-
     # need to remove this file for the following case:
     # status call 1 does not write to file; call 2 writes to file;
     # call 3 does not write to file, so contents are still call 2's result
@@ -77,45 +82,58 @@ class PythonExecutor:
     script_params += [tmpstructedoutfile, logger_level, tmp_dir]
     pythonCommand = self.python_command(script, script_params)
     logger.info("Running command " + pprint.pformat(pythonCommand))
-    process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr)
-    # map task_id to pid
-    callback(task_id, process.pid)
-    logger.debug("Launching watchdog thread")
-    self.event.clear()
-    self.python_process_has_been_killed = False
-    thread = Thread(target =  self.python_watchdog_func, args = (process, timeout))
-    thread.start()
-    # Waiting for the process to be either finished or killed
-    process.communicate()
-    self.event.set()
-    thread.join()
+    if(handle == None) :
+      tmpout, tmperr = self.open_subporcess_files(tmpoutfile, tmperrfile, override_output_files)
+      
+      process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr)
+      # map task_id to pid
+      callback(task_id, process.pid)
+      logger.debug("Launching watchdog thread")
+      self.event.clear()
+      self.python_process_has_been_killed = False
+      thread = Thread(target =  self.python_watchdog_func, args = (process, timeout))
+      thread.start()
+      # Waiting for the process to be either finished or killed
+      process.communicate()
+      self.event.set()
+      thread.join()
+      return self.prepare_process_result(process, tmpoutfile, tmperrfile, tmpstructedoutfile)
+    else:
+      holder = Holder(pythonCommand, tmpoutfile, tmperrfile, tmpstructedoutfile, handle)
+      
+      background = BackgroundThread(holder, self)
+      background.start()
+      return {"exitcode": 777}
+
+  def prepare_process_result (self, process, tmpoutfile, tmperrfile, tmpstructedoutfile):
+    out, error, structured_out = self.read_result_from_files(tmpoutfile, tmperrfile, tmpstructedoutfile)
     # Building results
-    error = self.NO_ERROR
     returncode = process.returncode
-    out = open(tmpoutfile, 'r').read()
-    error = open(tmperrfile, 'r').read()
 
+    if self.python_process_has_been_killed:
+      error = str(error) + "\n Python script has been killed due to timeout"
+      returncode = 999
+    result = self.condenseOutput(out, error, returncode, structured_out)
+    logger.info("Result: %s" % result)
+    return result
+  
+  def read_result_from_files(self, out_path, err_path, structured_out_path):
+    out = open(out_path, 'r').read()
+    error = open(err_path, 'r').read()
     try:
-      with open(tmpstructedoutfile, 'r') as fp:
+      with open(structured_out_path, 'r') as fp:
         structured_out = json.load(fp)
     except Exception:
-      if os.path.exists(tmpstructedoutfile):
-        errMsg = 'Unable to read structured output from ' + tmpstructedoutfile
+      if os.path.exists(structured_out_path):
+        errMsg = 'Unable to read structured output from ' + structured_out_path
         structured_out = {
           'msg' : errMsg
         }
         logger.warn(structured_out)
       else:
         structured_out = {}
-
-    if self.python_process_has_been_killed:
-      error = str(error) + "\n Python script has been killed due to timeout"
-      returncode = 999
-    result = self.condenseOutput(out, error, returncode, structured_out)
-    logger.info("Result: %s" % result)
-    return result
-
-
+    return out, error, structured_out
+  
   def launch_python_subprocess(self, command, tmpout, tmperr):
     """
     Creates subprocess with given parameters. This functionality was moved to separate method
@@ -124,7 +142,7 @@ class PythonExecutor:
     return subprocess.Popen(command,
       stdout=tmpout,
       stderr=tmperr, close_fds=True)
-
+    
   def isSuccessfull(self, returncode):
     return not self.python_process_has_been_killed and returncode == 0
 
@@ -153,3 +171,39 @@ class PythonExecutor:
       shell.kill_process_with_children(python.pid)
       self.python_process_has_been_killed = True
     pass
+
+class Holder:
+  def __init__(self, command, out_file, err_file, structured_out_file, handle):
+    self.command = command
+    self.out_file = out_file
+    self.err_file = err_file
+    self.structured_out_file = structured_out_file
+    self.handle = handle
+    
+class BackgroundThread(threading.Thread):
+  def __init__(self, holder, pythonExecutor):
+    threading.Thread.__init__(self)
+    self.holder = holder
+    self.pythonExecutor = pythonExecutor
+  
+  def run(self):
+    process_out, process_err  = self.pythonExecutor.open_subporcess_files(self.holder.out_file, self.holder.err_file, True)
+    
+    logger.info("Starting process command %s" % self.holder.command)
+    process = self.pythonExecutor.launch_python_subprocess(self.holder.command, process_out, process_err)
+    
+    logger.info("Process has been started. Pid = %s" % process.pid)
+    
+    self.holder.handle.pid = process.pid
+    self.holder.handle.status = BackgroundCommandExecutionHandle.RUNNING_STATUS
+    self.holder.handle.on_background_command_started(self.holder.handle)
+    
+    process.communicate()
+    
+    self.holder.handle.exitCode = process.returncode
+    process_condenced_result = self.pythonExecutor.prepare_process_result(process, self.holder.out_file, self.holder.err_file, self.holder.structured_out_file)
+    logger.info("Calling callback with args %s" % process_condenced_result)
+    self.holder.handle.on_background_command_complete_callback(process_condenced_result, self.holder.handle)
+    logger.info("Exiting from thread for holder pid %s" % self.holder.handle.pid)
+    
+  

http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index e06efe4..4447670 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -27,6 +27,7 @@ import os, errno, time, pprint, tempfile, threading, json
 import StringIO
 import sys
 from threading import Thread
+import copy
 
 from mock.mock import patch, MagicMock, call
 from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
@@ -34,13 +35,11 @@ from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
 from ambari_agent.PythonExecutor import PythonExecutor
 from ambari_agent.CommandStatusDict import CommandStatusDict
 from ambari_agent.ActualConfigHandler import ActualConfigHandler
+from FileCache import FileCache
 
 
 class TestActionQueue(TestCase):
-
   def setUp(self):
-    out = StringIO.StringIO()
-    sys.stdout = out
     # save original open() method for later use
     self.original_open = open
 
@@ -155,6 +154,49 @@ class TestActionQueue(TestCase):
     'hostLevelParams': {}
   }
 
+  background_command = {
+    'commandType': 'BACKGROUND_EXECUTION_COMMAND',
+    'role': 'NAMENODE',
+    'roleCommand': 'CUSTOM_COMMAND',
+    'commandId': '1-1',
+    'taskId': 19,
+    'clusterName': 'c1',
+    'serviceName': 'HDFS',
+    'configurations':{'global' : {}},
+    'configurationTags':{'global' : { 'tag': 'v123' }},
+    'hostLevelParams':{'custom_command': 'REBALANCE_HDFS'},
+    'commandParams' :  {
+      'script_type' : 'PYTHON',
+      'script' : 'script.py',
+      'command_timeout' : '600',
+      'jdk_location' : '.',
+      'service_package_folder' : '.'
+      }
+  }
+  cancel_background_command = {
+    'commandType': 'EXECUTION_COMMAND',
+    'role': 'NAMENODE',
+    'roleCommand': 'ACTIONEXECUTE',
+    'commandId': '1-1',
+    'taskId': 20,
+    'clusterName': 'c1',
+    'serviceName': 'HDFS',
+    'configurations':{'global' : {}},
+    'configurationTags':{'global' : {}},
+    'hostLevelParams':{},
+    'commandParams' :  {
+      'script_type' : 'PYTHON',
+      'script' : 'cancel_background_task.py',
+      'before_system_hook_function' : 'fetch_bg_pid_by_taskid',
+      'jdk_location' : '.',
+      'command_timeout' : '600',
+      'service_package_folder' : '.',
+      'cancel_policy': 'SIGKILL',
+      'cancel_task_id': "19",
+      }
+  }
+
+
   @patch.object(ActionQueue, "process_command")
   @patch.object(Queue, "get")
   @patch.object(CustomServiceOrchestrator, "__init__")
@@ -526,3 +568,170 @@ class TestActionQueue(TestCase):
     actionQueue.join()
     self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
 
+  @patch.object(StackVersionsFileHandler, "read_stack_version")
+  @patch.object(CustomServiceOrchestrator, "runCommand")
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_execute_background_command(self, CustomServiceOrchestrator_mock,
+                                  runCommand_mock, read_stack_version_mock
+                                  ):
+    CustomServiceOrchestrator_mock.return_value = None
+    CustomServiceOrchestrator.runCommand.return_value = {'exitcode' : 0,
+                                                         'stdout': 'out-11',
+                                                         'stderr' : 'err-13'}
+    
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
+
+    execute_command = copy.deepcopy(self.background_command)
+    actionQueue.put([execute_command])
+    actionQueue.processBackgroundQueueSafeEmpty();
+    actionQueue.processStatusCommandQueueSafeEmpty();
+    
+    #assert that python execturor start
+    self.assertTrue(runCommand_mock.called)
+    runningCommand = actionQueue.commandStatuses.current_state.get(execute_command['taskId'])
+    self.assertTrue(runningCommand is not None)
+    self.assertEqual(runningCommand[1]['status'], ActionQueue.IN_PROGRESS_STATUS)
+    
+    report = actionQueue.result()
+    self.assertEqual(len(report['reports']),1)
+    
+
+      
+  @patch.object(StackVersionsFileHandler, "read_stack_version")
+  @patch.object(CustomServiceOrchestrator, "resolve_script_path")
+  @patch.object(FileCache, "__init__")
+  def test_execute_python_executor(self, read_stack_version_mock, FileCache_mock, resolve_script_path_mock):
+    FileCache_mock.return_value = None
+    
+    
+    dummy_controller = MagicMock()
+    cfg = AmbariConfig().getConfig()
+    cfg.set('agent', 'tolerate_download_failures', 'true')
+    cfg.set('agent', 'prefix', '.')
+    cfg.set('agent', 'cache_dir', 'background_tasks')
+    
+    actionQueue = ActionQueue(cfg, dummy_controller)
+    patch_output_file(actionQueue.customServiceOrchestrator.python_executor)
+    actionQueue.customServiceOrchestrator.dump_command_to_json = MagicMock()
+   
+    result = {}
+    lock = threading.RLock()
+    complete_done = threading.Condition(lock)
+    start_done = threading.Condition(lock)
+    
+    def command_started_w(handle):
+      with lock:
+        result['command_started'] = {'handle': copy.copy(handle), 'command_status' : actionQueue.commandStatuses.get_command_status(handle.command['taskId'])}
+        start_done.notifyAll()
+    
+    def command_complete_w(process_condenced_result, handle):
+      with lock:
+        result['command_complete'] = {'condenced_result' : copy.copy(process_condenced_result), 
+                                      'handle' : copy.copy(handle),
+                                      'command_status' : actionQueue.commandStatuses.get_command_status(handle.command['taskId'])
+                                      }
+        complete_done.notifyAll()
+    
+    actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,None, command_complete_w)
+    actionQueue.on_background_command_started = wraped(actionQueue.on_background_command_started,None,command_started_w)
+    actionQueue.put([self.background_command])
+    actionQueue.processBackgroundQueueSafeEmpty();
+    actionQueue.processStatusCommandQueueSafeEmpty();
+    
+    with lock:
+      start_done.wait(5)
+      
+      self.assertTrue(result.has_key('command_started'), 'command started callback was not fired')
+      started_handle = result['command_started']['handle']
+      started_status = result['command_started']['command_status']
+      
+      self.assertEqual(started_handle.pid, started_status['pid'])
+      self.assertTrue(started_handle.pid > 0, "PID was not assigned to handle")
+      self.assertEqual(started_status['status'], ActionQueue.IN_PROGRESS_STATUS)
+      
+      complete_done.wait(2)
+      
+      finished_handle = result['command_complete']['handle']
+      self.assertEqual(started_handle.pid, finished_handle.pid)
+      finished_status = result['command_complete']['command_status']
+      self.assertEqual(finished_status['status'], ActionQueue.COMPLETED_STATUS)
+      self.assertEqual(finished_status['stdout'], 'process_out')
+      self.assertEqual(finished_status['stderr'], 'process_err')
+      self.assertEqual(finished_status['exitCode'], 0)
+      
+    
+    runningCommand = actionQueue.commandStatuses.current_state.get(self.background_command['taskId'])
+    self.assertTrue(runningCommand is not None)
+    
+    report = actionQueue.result()
+    self.assertEqual(len(report['reports']),1)
+    self.assertEqual(report['reports'][0]['stdout'],'process_out')
+#     self.assertEqual(report['reports'][0]['structuredOut'],'{"a": "b."}')
+    
+        
+  @patch.object(StackVersionsFileHandler, "read_stack_version")
+  @patch.object(FileCache, "__init__")
+  def test_cancel_backgound_command(self, read_stack_version_mock, FileCache_mock):
+    FileCache_mock.return_value = None
+    
+    dummy_controller = MagicMock()
+    cfg = AmbariConfig().getConfig()
+    cfg.set('agent', 'tolerate_download_failures', 'true')
+    cfg.set('agent', 'prefix', '.')
+    cfg.set('agent', 'cache_dir', 'background_tasks')
+    
+    actionQueue = ActionQueue(cfg, dummy_controller)
+    patch_output_file(actionQueue.customServiceOrchestrator.python_executor)
+    actionQueue.customServiceOrchestrator.python_executor.prepare_process_result = MagicMock()
+    actionQueue.customServiceOrchestrator.dump_command_to_json = MagicMock()
+
+    lock = threading.RLock()
+    complete_done = threading.Condition(lock)
+    
+    def command_complete_w(process_condenced_result, handle):
+      with lock:
+        complete_done.wait(4)
+    
+    actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,None, command_complete_w)
+    execute_command = copy.deepcopy(self.background_command)
+    actionQueue.put([execute_command])
+    actionQueue.processBackgroundQueueSafeEmpty();
+    
+    time.sleep(1)
+    
+    actionQueue.process_command(self.cancel_background_command)
+    #TODO add assert
+    
+    with lock:
+      complete_done.notifyAll()
+      
+      
+def patch_output_file(pythonExecutor):
+  def windows_py(command, tmpout, tmperr):
+    proc = MagicMock()
+    proc.pid = 33
+    proc.returncode = 0
+    with tmpout:
+      tmpout.write('process_out')
+    with tmperr:
+      tmperr.write('process_err')
+    return proc
+  def open_subporcess_files_win(fout, ferr, f):
+    return MagicMock(), MagicMock()
+  def read_result_from_files(out_path, err_path, structured_out_path):
+    return 'process_out', 'process_err', '{"a": "b."}'
+  pythonExecutor.launch_python_subprocess = windows_py
+  pythonExecutor.open_subporcess_files = open_subporcess_files_win
+  pythonExecutor.read_result_from_files = read_result_from_files
+    
+def wraped(func, before = None, after = None):
+    def wrapper(*args, **kwargs):
+      if(before is not None):
+        before(*args, **kwargs)
+      ret =  func(*args, **kwargs)
+      if(after is not None):
+        after(*args, **kwargs)
+      return ret
+    return wrapper   
+  

http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
index d669cd2..a1e1c66 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
@@ -39,6 +39,7 @@ import sys
 from AgentException import AgentException
 from FileCache import FileCache
 from LiveStatus import LiveStatus
+from BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
 
 
 class TestCustomServiceOrchestrator(TestCase):
@@ -396,6 +397,39 @@ class TestCustomServiceOrchestrator(TestCase):
     self.assertEqual(runCommand_mock.return_value, status)
 
 
+  @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
+  @patch.object(FileCache, "__init__")
+  @patch.object(FileCache, "get_custom_actions_base_dir")
+  def test_runCommand_background_action(self, get_custom_actions_base_dir_mock,
+                                    FileCache_mock,
+                                    dump_command_to_json_mock):
+    FileCache_mock.return_value = None
+    get_custom_actions_base_dir_mock.return_value = "some path"
+    _, script = tempfile.mkstemp()
+    command = {
+      'role' : 'any',
+      'commandParams': {
+        'script_type': 'PYTHON',
+        'script': 'some_custom_action.py',
+        'command_timeout': '600',
+        'jdk_location' : 'some_location'
+      },
+      'taskId' : '13',
+      'roleCommand': 'ACTIONEXECUTE',
+      'commandType': 'BACKGROUND_EXECUTION_COMMAND',
+      '__handle' : BackgroundCommandExecutionHandle(None,13,MagicMock(), MagicMock())
+    }
+    dummy_controller = MagicMock()
+    orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
+    
+    import TestActionQueue
+    TestActionQueue.patch_output_file(orchestrator.python_executor)
+    orchestrator.python_executor.condenseOutput = MagicMock()
+    orchestrator.dump_command_to_json = MagicMock()
+    
+    ret = orchestrator.runCommand(command, "out.txt", "err.txt")
+    self.assertEqual(ret['exitcode'], 777)
+
   def tearDown(self):
     # enable stdout
     sys.stdout = sys.__stdout__

http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-server/pom.xml b/ambari-server/pom.xml
index 22aa454..1914c03 100644
--- a/ambari-server/pom.xml
+++ b/ambari-server/pom.xml
@@ -135,6 +135,8 @@
             <exclude>src/main/resources/db/serial</exclude>
             <exclude>src/main/resources/db/index.txt</exclude>
             <exclude>src/main/resources/stacks/HDP/2.1.GlusterFS/services/YARN/package/templates/exclude_hosts_list.j2</exclude>
+            <exclude>src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/balancer-emulator/balancer-err.log</exclude>
+            <exclude>src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/balancer-emulator/balancer.log</exclude>
             <exclude>conf/unix/ca.config</exclude>
             <exclude>conf/unix/krb5JAASLogin.conf</exclude>
             <exclude>**/*.json</exclude>

http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 9e3f69c..cab891f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -31,18 +31,13 @@ import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
-import com.google.common.reflect.TypeToken;
-import com.google.inject.persist.UnitOfWork;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.ServiceComponentHostNotFoundException;
 import org.apache.ambari.server.ServiceComponentNotFoundException;
 import org.apache.ambari.server.agent.ActionQueue;
+import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
 import org.apache.ambari.server.agent.CancelCommand;
 import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.agent.ExecutionCommand;
@@ -65,6 +60,13 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.reflect.TypeToken;
+import com.google.inject.persist.UnitOfWork;
+
 
 
 /**
@@ -193,7 +195,6 @@ class ActionScheduler implements Runnable {
       processCancelledRequestsList();
 
       Set<Long> runningRequestIds = new HashSet<Long>();
-      Set<String> affectedHosts = new HashSet<String>();
       List<Stage> stages = db.getStagesInProgress();
       if (LOG.isDebugEnabled()) {
         LOG.debug("Scheduler wakes up");
@@ -207,6 +208,10 @@ class ActionScheduler implements Runnable {
         return;
       }
       int i_stage = 0;
+      
+      
+      stages = filterParallelPerHostStages(stages);
+      
       for (Stage s : stages) {
         // Check if we can process this stage in parallel with another stages
         i_stage ++;
@@ -225,20 +230,7 @@ class ActionScheduler implements Runnable {
           }
         }
 
-        List<String> stageHosts = s.getHosts();
-        boolean conflict = false;
-        for (String host : stageHosts) {
-          if (affectedHosts.contains(host)) {
-            conflict = true;
-            break;
-          }
-        }
-        if (conflict) {
-          // Also we don't want to perform stages in parallel at the same hosts
-          continue;
-        } else {
-          affectedHosts.addAll(stageHosts);
-        }
+        
 
         // Commands that will be scheduled in current scheduler wakeup
         List<ExecutionCommand> commandsToSchedule = new ArrayList<ExecutionCommand>();
@@ -354,6 +346,38 @@ class ActionScheduler implements Runnable {
     }
   }
 
+  /**
+   * Returns filtered list of stages following the rule:
+   * 1) remove stages that has the same host. Leave only first stage, the rest that have same host of any operation will be filtered
+   * 2) do not remove stages intersected by host if they have intersection by background command
+   * @param stages
+   * @return
+   */
+  private List<Stage> filterParallelPerHostStages(List<Stage> stages) {
+    List<Stage> retVal = new ArrayList<Stage>();
+    Set<String> affectedHosts = new HashSet<String>();
+    for(Stage s : stages){
+      for (String host : s.getHosts()) {
+        if (!affectedHosts.contains(host)) {
+          if(!isStageHasBackgroundCommandsOnly(s, host)){
+            affectedHosts.add(host);
+          }
+          retVal.add(s);
+        }
+      }
+    }
+    return retVal;
+  }
+
+  private boolean isStageHasBackgroundCommandsOnly(Stage s, String host) {
+    for (ExecutionCommandWrapper c : s.getExecutionCommands(host)) {
+      if(c.getExecutionCommand().getCommandType() != AgentCommandType.BACKGROUND_EXECUTION_COMMAND)
+      {
+        return false;
+      }
+    }
+    return true;
+  }
 
   /**
    * Executes internal ambari-server action

http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentCommand.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentCommand.java
index 8703320..54faf6a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentCommand.java
@@ -31,6 +31,7 @@ public abstract class AgentCommand {
 
   public enum AgentCommandType {
     EXECUTION_COMMAND,
+    BACKGROUND_EXECUTION_COMMAND,
     STATUS_COMMAND,
     CANCEL_COMMAND,
     REGISTRATION_COMMAND

http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index f2b5433..fa633c1 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -563,6 +563,7 @@ public class HeartBeatHandler {
           throw new AmbariException("Could not get jaxb string for command", e);
         }
         switch (ac.getCommandType()) {
+          case BACKGROUND_EXECUTION_COMMAND: 
           case EXECUTION_COMMAND: {
             response.addExecutionCommand((ExecutionCommand) ac);
             break;

http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
index 4f9a8a4..80af575 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
@@ -402,15 +402,7 @@ public class AmbariMetaInfo {
   public boolean isValidServiceComponent(String stackName, String version,
                                          String serviceName, String componentName) throws AmbariException {
     ServiceInfo service = getServiceInfo(stackName, version, serviceName);
-    if (service == null) {
-      return false;
-    }
-    for (ComponentInfo compInfo : service.getComponents()) {
-      if (compInfo.getName().equals(componentName)) {
-        return true;
-      }
-    }
-    return false;
+    return service != null && service.getComponentByName(componentName) != null;
   }
 
   /**
@@ -436,17 +428,12 @@ public class AmbariMetaInfo {
         || services.isEmpty()) {
       return retService;
     }
-    boolean found = false;
     for (Map.Entry<String, ServiceInfo> entry : services.entrySet()) {
-      for (ComponentInfo compInfo : entry.getValue().getComponents()) {
-        if (compInfo.getName().equals(componentName)) {
-          retService = entry.getKey();
-          found = true;
-          break;
-        }
-      }
-      if (found)
+      ComponentInfo vu = entry.getValue().getComponentByName(componentName);
+      if(vu != null){
+        retService = entry.getKey();
         break;
+      }
     }
     return retService;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/api/util/StackExtensionHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/util/StackExtensionHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/api/util/StackExtensionHelper.java
index c702a45..65efa77 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/api/util/StackExtensionHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/util/StackExtensionHelper.java
@@ -254,8 +254,7 @@ public class StackExtensionHelper {
 
     for (ComponentInfo childComponent : childService.getComponents()) {
       if (!childComponent.isDeleted()) {
-        ComponentInfo parentComponent = getComponent(parentService,
-                childComponent.getName());
+        ComponentInfo parentComponent = parentService.getComponentByName(childComponent.getName());
         if (parentComponent != null) { // If parent has similar component
           ComponentInfo mergedComponent = mergeComponents(parentComponent,
                   childComponent);
@@ -278,17 +277,6 @@ public class StackExtensionHelper {
     }
   }
 
-
-  private ComponentInfo getComponent(ServiceInfo service, String componentName) {
-    for (ComponentInfo component : service.getComponents()) {
-      if (component.getName().equals(componentName)) {
-        return component;
-      }
-    }
-    return null;
-  }
-
-
   ComponentInfo mergeComponents(ComponentInfo parent, ComponentInfo child) {
     ComponentInfo result = new ComponentInfo(child); // cloning child
     CommandScriptDefinition commandScript = child.getCommandScript();

http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
index cf628d9..339194f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
@@ -42,6 +42,7 @@ import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
 import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
 import org.apache.ambari.server.agent.ExecutionCommand;
 import org.apache.ambari.server.agent.ExecutionCommand.KeyNames;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
@@ -53,6 +54,7 @@ import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.CommandScriptDefinition;
 import org.apache.ambari.server.state.ComponentInfo;
 import org.apache.ambari.server.state.ConfigHelper;
+import org.apache.ambari.server.state.CustomCommandDefinition;
 import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.HostComponentAdminState;
 import org.apache.ambari.server.state.MaintenanceState;
@@ -236,7 +238,6 @@ public class AmbariCustomCommandExecutionHelper {
       throw new AmbariException(message);
     }
 
-
     StackId stackId = cluster.getDesiredStackVersion();
     AmbariMetaInfo ambariMetaInfo = managementController.getAmbariMetaInfo();
     ServiceInfo serviceInfo = ambariMetaInfo.getServiceInfo
@@ -244,6 +245,12 @@ public class AmbariCustomCommandExecutionHelper {
     StackInfo stackInfo = ambariMetaInfo.getStackInfo
       (stackId.getStackName(), stackId.getStackVersion());
 
+    CustomCommandDefinition customCommandDefinition = null;
+    ComponentInfo ci = serviceInfo.getComponentByName(componentName);
+    if(ci != null){
+      customCommandDefinition = ci.getCustomCommandByName(commandName);
+    }
+    
     long nowTimestamp = System.currentTimeMillis();
 
     for (String hostName : candidateHosts) {
@@ -271,6 +278,11 @@ public class AmbariCustomCommandExecutionHelper {
       ExecutionCommand execCmd = stage.getExecutionCommandWrapper(hostName,
           componentName).getExecutionCommand();
 
+      //set type background
+      if(customCommandDefinition != null && customCommandDefinition.isBackground()){
+        execCmd.setCommandType(AgentCommandType.BACKGROUND_EXECUTION_COMMAND);
+      }
+      
       execCmd.setConfigurations(configurations);
       execCmd.setConfigurationAttributes(configurationAttributes);
       execCmd.setConfigurationTags(configTags);

http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/state/ComponentInfo.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ComponentInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ComponentInfo.java
index f8b952c..172b1ea 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/ComponentInfo.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ComponentInfo.java
@@ -156,6 +156,14 @@ public class ComponentInfo {
     }
     return false;
   }
+  public CustomCommandDefinition getCustomCommandByName(String commandName){
+    for(CustomCommandDefinition ccd : getCustomCommands()){
+      if (ccd.getName().equals(commandName)){
+        return ccd;
+      }
+    }
+    return null;
+  }
 
   public List<DependencyInfo> getDependencies() {
     return dependencies;

http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java
index 481245c..1161cc6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java
@@ -563,7 +563,7 @@ public class ConfigHelper {
     
     ServiceInfo serviceInfo = ambariMetaInfo.getService(stackId.getStackName(),
         stackId.getStackVersion(), sch.getServiceName());
-    ComponentInfo componentInfo = getComponentInfo(serviceInfo,sch.getServiceComponentName());
+    ComponentInfo componentInfo = serviceInfo.getComponentByName(sch.getServiceComponentName());
     // Configs are considered stale when:
     // - desired type DOES NOT exist in actual
     // --- desired type DOES NOT exist in stack: not_stale
@@ -621,15 +621,6 @@ public class ConfigHelper {
     return stale;
   }
 
-  private ComponentInfo getComponentInfo(ServiceInfo serviceInfo, String componentName) {
-    for(ComponentInfo componentInfo : serviceInfo.getComponents()) {
-      if(componentInfo.getName().equals(componentName)){
-        return componentInfo;
-      }
-    }
-    return null;
-  }
-
   /**
    * @return <code>true</code> if any service on the stack defines a property
    * for the type.

http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/state/CustomCommandDefinition.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/CustomCommandDefinition.java b/ambari-server/src/main/java/org/apache/ambari/server/state/CustomCommandDefinition.java
index a26e7be..72eeb50 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/CustomCommandDefinition.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/CustomCommandDefinition.java
@@ -30,15 +30,20 @@ public class CustomCommandDefinition {
 
   private String name;
   private CommandScriptDefinition commandScript;
+  private boolean background = false;
 
   public String getName() {
     return name;
   }
+  
+  public boolean isBackground() {
+    return background;
+  }
 
   public CommandScriptDefinition getCommandScript() {
     return commandScript;
   }
-
+  
   @Override
   public boolean equals(Object obj) {
     if (obj == null) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceInfo.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceInfo.java
index f2d9fe3..ecc5c11 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceInfo.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceInfo.java
@@ -180,7 +180,19 @@ public class ServiceInfo {
     if (components == null) components = new ArrayList<ComponentInfo>();
     return components;
   }
-
+  /**
+   * Finds ComponentInfo by component name
+   * @param componentName
+   * @return ComponentInfo componentName or null
+   */
+  public ComponentInfo getComponentByName(String componentName){
+    for(ComponentInfo componentInfo : getComponents()) {
+      if(componentInfo.getName().equals(componentName)){
+        return componentInfo;
+      }
+    }
+    return null;
+  }
   public boolean isClientOnlyService() {
     if (components == null || components.isEmpty()) {
       return false;

http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml b/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml
index 37ba394..b9600dd 100644
--- a/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml
+++ b/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml
@@ -20,16 +20,6 @@
 
 <actionDefinitions>
   <actionDefinition>
-    <actionName>ambari_hdfs_rebalancer</actionName>
-    <actionType>SYSTEM</actionType>
-    <inputs>threshold,[principal],[keytab]</inputs>
-    <targetService>HDFS</targetService>
-    <targetComponent>NAMENODE</targetComponent>
-    <defaultTimeout>600</defaultTimeout>
-    <description>HDFS Rebalance</description>
-    <targetType>ANY</targetType>
-  </actionDefinition>
-  <actionDefinition>
     <actionName>nagios_update_ignore</actionName>
     <actionType>SYSTEM</actionType>
     <inputs>[nagios_ignore]</inputs>
@@ -59,4 +49,14 @@
     <description>Validate if provided service config can be applied to specified hosts</description>
     <targetType>ALL</targetType>
   </actionDefinition>
+  <actionDefinition>
+    <actionName>cancel_background_task</actionName>
+    <actionType>SYSTEM</actionType>
+    <inputs></inputs>
+    <targetService></targetService>
+    <targetComponent></targetComponent>
+    <defaultTimeout>60</defaultTimeout>
+    <description>Cancel background task</description>
+    <targetType>ANY</targetType>
+  </actionDefinition>
 </actionDefinitions>

http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/resources/custom_actions/ambari_hdfs_rebalancer.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/custom_actions/ambari_hdfs_rebalancer.py b/ambari-server/src/main/resources/custom_actions/ambari_hdfs_rebalancer.py
deleted file mode 100644
index fd1e5d1..0000000
--- a/ambari-server/src/main/resources/custom_actions/ambari_hdfs_rebalancer.py
+++ /dev/null
@@ -1,59 +0,0 @@
-#!/usr/bin/env python
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
-Ambari Agent
-
-"""
-
-from resource_management import *
-
-
-class HdfsRebalance(Script):
-  def actionexecute(self, env):
-    config = Script.get_config()
-
-    hdfs_user = config['configurations']['global']['hdfs_user']
-    conf_dir = "/etc/hadoop/conf"
-
-    _authentication = config['configurations']['core-site']['hadoop.security.authentication']
-    security_enabled = ( not is_empty(_authentication) and _authentication == 'kerberos')
-
-    threshold = config['commandParams']['threshold']
-
-    if security_enabled:
-      kinit_path_local = functions.get_kinit_path(
-        ["/usr/bin", "/usr/kerberos/bin", "/usr/sbin"])
-      principal = config['commandParams']['principal']
-      keytab = config['commandParams']['keytab']
-      Execute(format("{kinit_path_local}  -kt {keytab} {principal}"))
-
-    ExecuteHadoop(format('balancer -threshold {threshold}'),
-                  user=hdfs_user,
-                  conf_dir=conf_dir,
-                  logoutput=True
-    )
-
-    structured_output_example = {
-      'result': 'Rebalancer completed.'
-    }
-
-    self.put_structured_out(structured_output_example)
-
-
-if __name__ == "__main__":
-  HdfsRebalance().execute()

http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/resources/custom_actions/cancel_background_task.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/custom_actions/cancel_background_task.py b/ambari-server/src/main/resources/custom_actions/cancel_background_task.py
new file mode 100644
index 0000000..9f9b1ea
--- /dev/null
+++ b/ambari-server/src/main/resources/custom_actions/cancel_background_task.py
@@ -0,0 +1,41 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+from resource_management import Script
+from ambari_agent import shell
+
+class CancelBackgroundTaskCommand(Script):
+  def actionexecute(self, env):
+    config = Script.get_config()
+
+    cancel_command_pid = config['commandParams']['cancel_command_pid'] if config['commandParams'].has_key('cancel_command_pid') else None
+    cancel_task_id = config['commandParams']['cancel_task_id']
+    if cancel_command_pid == None:
+      print "Nothing to cancel: there is no any task running with given taskId = '%s'" % cancel_task_id
+    else:
+      cancel_policy = config['commandParams']['cancel_policy']
+      print "Send Kill to process pid = %s for task = %s with policy %s" % (cancel_command_pid, cancel_task_id, cancel_policy)
+  
+      shell.kill_process_with_children(cancel_command_pid)
+      print "Process pid = %s for task = %s has been killed successfully" % (cancel_command_pid, cancel_task_id)
+    
+if __name__ == "__main__":
+  CancelBackgroundTaskCommand().execute()

http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/metainfo.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/metainfo.xml b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/metainfo.xml
index 7ac5e34..3d30e07 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/metainfo.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/metainfo.xml
@@ -42,6 +42,14 @@
                 <timeout>600</timeout>
               </commandScript>
             </customCommand>
+            <customCommand>
+              <name>REBALANCEHDFS</name>
+              <background>true</background>
+              <commandScript>
+                <script>scripts/namenode.py</script>
+                <scriptType>PYTHON</scriptType>
+              </commandScript>
+            </customCommand>
           </customCommands>
         </component>