You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ma...@apache.org on 2014/08/08 19:10:41 UTC
[4/4] git commit: AMBARI-5934. Provide ability to rebalance HDFS.
AMBARI-5934. Provide ability to rebalance HDFS.
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/cb662f49
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/cb662f49
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/cb662f49
Branch: refs/heads/trunk
Commit: cb662f494f27b0ffc2058151f550b0cb08572db1
Parents: 70588f5
Author: Mahadev Konar <ma...@apache.org>
Authored: Fri Aug 8 10:10:17 2014 -0700
Committer: Mahadev Konar <ma...@apache.org>
Committed: Fri Aug 8 10:10:21 2014 -0700
----------------------------------------------------------------------
.gitignore | 1 +
.../src/main/python/ambari_agent/ActionQueue.py | 84 +-
.../BackgroundCommandExecutionHandle.py | 44 +
.../python/ambari_agent/CommandStatusDict.py | 23 +-
.../ambari_agent/CustomServiceOrchestrator.py | 59 +-
.../main/python/ambari_agent/PythonExecutor.py | 128 ++-
.../test/python/ambari_agent/TestActionQueue.py | 215 +++-
.../TestCustomServiceOrchestrator.py | 34 +
ambari-server/pom.xml | 2 +
.../server/actionmanager/ActionScheduler.java | 66 +-
.../ambari/server/agent/AgentCommand.java | 1 +
.../ambari/server/agent/HeartBeatHandler.java | 1 +
.../server/api/services/AmbariMetaInfo.java | 23 +-
.../server/api/util/StackExtensionHelper.java | 14 +-
.../AmbariCustomCommandExecutionHelper.java | 14 +-
.../ambari/server/state/ComponentInfo.java | 8 +
.../ambari/server/state/ConfigHelper.java | 11 +-
.../server/state/CustomCommandDefinition.java | 7 +-
.../apache/ambari/server/state/ServiceInfo.java | 14 +-
.../system_action_definitions.xml | 20 +-
.../custom_actions/ambari_hdfs_rebalancer.py | 59 -
.../custom_actions/cancel_background_task.py | 41 +
.../stacks/HDP/2.0.6/services/HDFS/metainfo.xml | 8 +
.../scripts/balancer-emulator/balancer-err.log | 1032 ++++++++++++++++++
.../scripts/balancer-emulator/balancer.log | 29 +
.../scripts/balancer-emulator/hdfs-command.py | 45 +
.../HDFS/package/scripts/hdfs_rebalance.py | 130 +++
.../services/HDFS/package/scripts/namenode.py | 52 +
.../services/HDFS/package/scripts/params.py | 1 +
.../actionmanager/TestActionScheduler.java | 83 ++
.../server/api/services/AmbariMetaInfoTest.java | 11 +-
.../AmbariManagementControllerTest.java | 3 +-
.../BackgroundCustomCommandExecutionTest.java | 275 +++++
.../ActionDefinitionManagerTest.java | 18 +-
.../python/stacks/2.0.6/HDFS/test_namenode.py | 7 +
.../2.0.6/configs/rebalancehdfs_default.json | 388 +++++++
.../cust_action_definitions1.xml | 10 -
.../system_action_definitions.xml | 32 -
.../stacks/HDP/2.0.5/services/HDFS/metainfo.xml | 9 +
.../global/background_operations_controller.js | 6 +
ambari-web/app/controllers/main/service/item.js | 75 +-
ambari-web/app/messages.js | 8 +
.../templates/common/host_progress_popup.hbs | 19 +-
.../app/templates/common/prompt_popup.hbs | 5 +-
ambari-web/app/utils/ajax/ajax.js | 50 +
ambari-web/app/utils/helper.js | 3 +
ambari-web/app/utils/host_progress_popup.js | 37 +-
ambari-web/app/views/common/modal_popup.js | 2 +
ambari-web/app/views/main/service/item.js | 11 +-
49 files changed, 2942 insertions(+), 276 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 0430303..cff27da 100644
--- a/.gitignore
+++ b/.gitignore
@@ -20,3 +20,4 @@ derby.log
pass.txt
ambari-agent/src/test/python/ambari_agent/dummy_files/current-stack
velocity.log*
+*.pydevproject
http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index d3aad6e..6437036 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -26,11 +26,13 @@ import pprint
import os
import json
+from AgentException import AgentException
from LiveStatus import LiveStatus
from shell import shellRunner
from ActualConfigHandler import ActualConfigHandler
from CommandStatusDict import CommandStatusDict
from CustomServiceOrchestrator import CustomServiceOrchestrator
+from ambari_agent.BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
logger = logging.getLogger()
@@ -52,9 +54,12 @@ class ActionQueue(threading.Thread):
STATUS_COMMAND = 'STATUS_COMMAND'
EXECUTION_COMMAND = 'EXECUTION_COMMAND'
+ BACKGROUND_EXECUTION_COMMAND = 'BACKGROUND_EXECUTION_COMMAND'
+ CANCEL_BACKGROUND_EXECUTION_COMMAND = 'CANCEL_BACKGROUND_EXECUTION_COMMAND'
ROLE_COMMAND_INSTALL = 'INSTALL'
ROLE_COMMAND_START = 'START'
ROLE_COMMAND_STOP = 'STOP'
+ ROLE_COMMAND_CANCEL = 'CANCEL'
ROLE_COMMAND_CUSTOM_COMMAND = 'CUSTOM_COMMAND'
CUSTOM_COMMAND_RESTART = 'RESTART'
@@ -66,6 +71,7 @@ class ActionQueue(threading.Thread):
super(ActionQueue, self).__init__()
self.commandQueue = Queue.Queue()
self.statusCommandQueue = Queue.Queue()
+ self.backgroundCommandQueue = Queue.Queue()
self.commandStatuses = CommandStatusDict(callback_action =
self.status_update_callback)
self.config = config
@@ -74,8 +80,7 @@ class ActionQueue(threading.Thread):
self.configTags = {}
self._stop = threading.Event()
self.tmpdir = config.get('agent', 'prefix')
- self.customServiceOrchestrator = CustomServiceOrchestrator(config,
- controller)
+ self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller, self.commandStatuses)
def stop(self):
@@ -106,7 +111,10 @@ class ActionQueue(threading.Thread):
command['serviceName'] + " of cluster " + \
command['clusterName'] + " to the queue.")
logger.debug(pprint.pformat(command))
- self.commandQueue.put(command)
+ if command['commandType'] == self.BACKGROUND_EXECUTION_COMMAND :
+ self.backgroundCommandQueue.put(self.createCommandHandle(command))
+ else:
+ self.commandQueue.put(command)
def cancel(self, commands):
for command in commands:
@@ -136,25 +144,45 @@ class ActionQueue(threading.Thread):
def run(self):
while not self.stopped():
- while not self.statusCommandQueue.empty():
- try:
- command = self.statusCommandQueue.get(False)
- self.process_command(command)
- except (Queue.Empty):
- pass
+ self.processBackgroundQueueSafeEmpty();
+ self.processStatusCommandQueueSafeEmpty();
try:
command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
self.process_command(command)
except (Queue.Empty):
pass
+ def processBackgroundQueueSafeEmpty(self):
+ while not self.backgroundCommandQueue.empty():
+ try:
+ command = self.backgroundCommandQueue.get(False)
+ if(command.has_key('__handle') and command['__handle'].status == None):
+ self.process_command(command)
+ except (Queue.Empty):
+ pass
+
+ def processStatusCommandQueueSafeEmpty(self):
+ while not self.statusCommandQueue.empty():
+ try:
+ command = self.statusCommandQueue.get(False)
+ self.process_command(command)
+ except (Queue.Empty):
+ pass
+
+
+ def createCommandHandle(self, command):
+ if(command.has_key('__handle')):
+ raise AgentException("Command already has __handle")
+ command['__handle'] = BackgroundCommandExecutionHandle(command, command['commandId'], self.on_background_command_started, self.on_background_command_complete_callback)
+ return command
def process_command(self, command):
logger.debug("Took an element of Queue: " + pprint.pformat(command))
# make sure we log failures
+ commandType = command['commandType']
try:
- if command['commandType'] == self.EXECUTION_COMMAND:
+ if commandType in [self.EXECUTION_COMMAND, self.BACKGROUND_EXECUTION_COMMAND]:
self.execute_command(command)
- elif command['commandType'] == self.STATUS_COMMAND:
+ elif commandType == self.STATUS_COMMAND:
self.execute_status_command(command)
else:
logger.error("Unrecognized command " + pprint.pformat(command))
@@ -165,11 +193,11 @@ class ActionQueue(threading.Thread):
def execute_command(self, command):
'''
- Executes commands of type EXECUTION_COMMAND
+ Executes commands of type EXECUTION_COMMAND
'''
clusterName = command['clusterName']
commandId = command['commandId']
-
+ isCommandBackground = command['commandType'] == self.BACKGROUND_EXECUTION_COMMAND
message = "Executing command with id = {commandId} for role = {role} of " \
"cluster {cluster}.".format(
commandId = str(commandId), role=command['role'],
@@ -189,13 +217,17 @@ class ActionQueue(threading.Thread):
'status': self.IN_PROGRESS_STATUS
})
self.commandStatuses.put_command_status(command, in_progress_status)
+
# running command
commandresult = self.customServiceOrchestrator.runCommand(command,
in_progress_status['tmpout'], in_progress_status['tmperr'])
+
+
# dumping results
- status = self.COMPLETED_STATUS
- if commandresult['exitcode'] != 0:
- status = self.FAILED_STATUS
+ if isCommandBackground:
+ return
+ else:
+ status = self.COMPLETED_STATUS if commandresult['exitcode'] == 0 else self.FAILED_STATUS
roleResult = self.commandStatuses.generate_report_template(command)
roleResult.update({
'stdout': commandresult['stdout'],
@@ -249,6 +281,26 @@ class ActionQueue(threading.Thread):
self.commandStatuses.put_command_status(command, roleResult)
+ def on_background_command_started(self, handle):
+ #update command with given handle
+ self.commandStatuses.update_command_status(handle.command, {'pid' : handle.pid})
+
+
+ def on_background_command_complete_callback(self, process_condenced_result, handle):
+ logger.debug('Start callback: %s' % process_condenced_result)
+ logger.debug('The handle is: %s' % handle)
+ status = self.COMPLETED_STATUS if handle.exitCode == 0 else self.FAILED_STATUS
+ roleResult = self.commandStatuses.generate_report_template(handle.command)
+
+ roleResult.update({
+ 'stdout': process_condenced_result['stdout'],
+ 'stderr': process_condenced_result['stderr'],
+ 'exitCode': process_condenced_result['exitcode'],
+ 'structuredOut': str(json.dumps(process_condenced_result['structuredOut'])) if 'structuredOut' in process_condenced_result else '',
+ 'status': status,
+ })
+
+ self.commandStatuses.put_command_status(handle.command, roleResult)
def execute_status_command(self, command):
'''
http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-agent/src/main/python/ambari_agent/BackgroundCommandExecutionHandle.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/BackgroundCommandExecutionHandle.py b/ambari-agent/src/main/python/ambari_agent/BackgroundCommandExecutionHandle.py
new file mode 100644
index 0000000..17b7ce5
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/BackgroundCommandExecutionHandle.py
@@ -0,0 +1,44 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+import logging
+
+logger = logging.getLogger()
+installScriptHash = -1
+
+class BackgroundCommandExecutionHandle:
+
+ SCHEDULED_STATUS = 'SCHEDULED'
+ RUNNING_STATUS = 'RUNNING'
+ STOP_REQUEST_STATUS = 'STOP_REQUEST'
+ STOPPED_STATUS = 'SCHEDULED'
+
+ def __init__(self, command, commandId, on_background_command_started, on_background_command_complete_callback):
+ self.command = command
+ self.pid = 0
+ self.status = None
+ self.exitCode = None
+ self.commandId = commandId
+ self.on_background_command_started = on_background_command_started
+ self.on_background_command_complete_callback = on_background_command_complete_callback
+
+
+
+ def __str__(self):
+ return "[BackgroundHandle: pid='{0}', status='{1}', exitCode='{2}', commandId='{3}']".format(self.pid, self.status, self.exitCode, self.commandId)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index f00ada2..0ebc45e 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -21,6 +21,7 @@ limitations under the License.
import json
import logging
import threading
+import copy
from Grep import Grep
logger = logging.getLogger()
@@ -58,7 +59,25 @@ class CommandStatusDict():
if not status_command:
self.callback_action()
-
+ def update_command_status(self, command, delta):
+ """
+ Updates status of command without replacing (overwrites with delta value)
+ """
+ if 'taskId' in command:
+ key = command['taskId']
+ status_command = False
+ else: # Status command reports has no task id
+ key = id(command)
+ status_command = True
+ with self.lock: # Synchronized
+ self.current_state[key][1].update(delta)
+ if not status_command:
+ self.callback_action()
+
+ def get_command_status(self, taskId):
+ with self.lock:
+ c = copy.copy(self.current_state[taskId][1])
+ return c
def generate_report(self):
"""
Generates status reports about commands that are IN_PROGRESS, COMPLETE or
@@ -72,7 +91,7 @@ class CommandStatusDict():
for key, item in self.current_state.items():
command = item[0]
report = item[1]
- if command ['commandType'] == ActionQueue.EXECUTION_COMMAND:
+ if command ['commandType'] in [ActionQueue.EXECUTION_COMMAND, ActionQueue.BACKGROUND_EXECUTION_COMMAND]:
if (report['status']) != ActionQueue.IN_PROGRESS_STATUS:
resultReports.append(report)
# Removing complete/failed command status from dict
http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index e13e543..093fc22 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -50,7 +50,7 @@ class CustomServiceOrchestrator():
PING_PORTS_KEY = "all_ping_ports"
AMBARI_SERVER_HOST = "ambari_server_host"
- def __init__(self, config, controller):
+ def __init__(self, config, controller, commandStatuses = None):
self.config = config
self.tmp_dir = config.get('agent', 'prefix')
self.exec_tmp_dir = config.get('agent', 'tmp_dir')
@@ -63,6 +63,8 @@ class CustomServiceOrchestrator():
self.public_fqdn = hostname.public_hostname(config)
# cache reset will be called on every agent registration
controller.registration_listeners.append(self.file_cache.reset)
+
+ self.commandStatuses = commandStatuses
# Clean up old status command files if any
try:
os.unlink(self.status_commands_stdout)
@@ -93,6 +95,8 @@ class CustomServiceOrchestrator():
script_type = command['commandParams']['script_type']
script = command['commandParams']['script']
timeout = int(command['commandParams']['command_timeout'])
+ before_interceptor_method = command['commandParams']['before_system_hook_function'] if command['commandParams'].has_key('before_system_hook_function') else None
+
if 'hostLevelParams' in command and 'jdk_location' in command['hostLevelParams']:
server_url_prefix = command['hostLevelParams']['jdk_location']
else:
@@ -110,6 +114,12 @@ class CustomServiceOrchestrator():
if command_name == self.CUSTOM_ACTION_COMMAND:
base_dir = self.file_cache.get_custom_actions_base_dir(server_url_prefix)
script_tuple = (os.path.join(base_dir, script) , base_dir)
+
+ # Call systemHook functions in current virtual machine. This function can enrich custom action
+ # command with some information from current machine. And can be considered as plugin
+ if before_interceptor_method != None:
+ self.processSystemHookFunctions(script_tuple, before_interceptor_method, command)
+
hook_dir = None
else:
if command_name == self.CUSTOM_COMMAND_COMMAND:
@@ -127,6 +137,11 @@ class CustomServiceOrchestrator():
message = "Unknown script type {0}".format(script_type)
raise AgentException(message)
# Execute command using proper interpreter
+ handle = None
+ if(command.has_key('__handle')):
+ handle = command['__handle']
+ del command['__handle']
+
json_path = self.dump_command_to_json(command)
pre_hook_tuple = self.resolve_hook_script_path(hook_dir,
self.PRE_HOOK_PREFIX, command_name, script_type)
@@ -141,12 +156,16 @@ class CustomServiceOrchestrator():
# Executing hooks and script
ret = None
+ from ActionQueue import ActionQueue
+ if(command.has_key('commandType') and command['commandType'] == ActionQueue.BACKGROUND_EXECUTION_COMMAND and len(filtered_py_file_list) > 1):
+ raise AgentException("Background commands are supported without hooks only")
+
for py_file, current_base_dir in filtered_py_file_list:
script_params = [command_name, json_path, current_base_dir]
ret = self.python_executor.run_file(py_file, script_params,
self.exec_tmp_dir, tmpoutfile, tmperrfile, timeout,
tmpstrucoutfile, logger_level, self.map_task_to_process,
- task_id, override_output_files)
+ task_id, override_output_files, handle = handle)
# Next run_file() invocations should always append to current output
override_output_files = False
if ret['exitcode'] != 0:
@@ -156,16 +175,17 @@ class CustomServiceOrchestrator():
raise AgentException("No script has been executed")
# if canceled
- pid = self.commands_in_progress.pop(task_id)
- if not isinstance(pid, int):
- reason = '\nCommand aborted. ' + pid
- ret['stdout'] += reason
- ret['stderr'] += reason
-
- with open(tmpoutfile, "a") as f:
- f.write(reason)
- with open(tmperrfile, "a") as f:
- f.write(reason)
+ if self.commands_in_progress.has_key(task_id):#Background command do not push in this collection (TODO)
+ pid = self.commands_in_progress.pop(task_id)
+ if not isinstance(pid, int):
+ reason = '\nCommand aborted. ' + pid
+ ret['stdout'] += reason
+ ret['stderr'] += reason
+
+ with open(tmpoutfile, "a") as f:
+ f.write(reason)
+ with open(tmperrfile, "a") as f:
+ f.write(reason)
except Exception: # We do not want to let agent fail completely
exc_type, exc_obj, exc_tb = sys.exc_info()
@@ -180,7 +200,20 @@ class CustomServiceOrchestrator():
}
return ret
-
+ def fetch_bg_pid_by_taskid(self,command):
+ cancel_command_pid = None
+ try:
+ cancelTaskId = int(command['commandParams']['cancel_task_id'])
+ status = self.commandStatuses.get_command_status(cancelTaskId)
+ cancel_command_pid = status['pid']
+ except Exception:
+ pass
+ logger.info("Found PID=%s for cancel taskId=%s" % (cancel_command_pid,cancelTaskId))
+ command['commandParams']['cancel_command_pid'] = cancel_command_pid
+
+ def processSystemHookFunctions(self, script_tuple, before_interceptor_method, command):
+ getattr(self, before_interceptor_method)(command)
+
def requestComponentStatus(self, command):
"""
Component status is determined by exit code, returned by runCommand().
http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
index 704e8f3..d130497 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
@@ -24,6 +24,9 @@ import subprocess
import pprint
import threading
from threading import Thread
+import time
+from BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
+
from Grep import Grep
import shell, sys
@@ -36,7 +39,6 @@ class PythonExecutor:
Warning: class maintains internal state. As a result, instances should not be
used as a singleton for a concurrent execution of python scripts
"""
-
NO_ERROR = "none"
grep = Grep()
event = threading.Event()
@@ -47,9 +49,19 @@ class PythonExecutor:
self.config = config
pass
+
+ def open_subporcess_files(self, tmpoutfile, tmperrfile, override_output_files):
+ if override_output_files: # Recreate files
+ tmpout = open(tmpoutfile, 'w')
+ tmperr = open(tmperrfile, 'w')
+ else: # Append to files
+ tmpout = open(tmpoutfile, 'a')
+ tmperr = open(tmperrfile, 'a')
+ return tmpout, tmperr
+
def run_file(self, script, script_params, tmp_dir, tmpoutfile, tmperrfile,
timeout, tmpstructedoutfile, logger_level, callback, task_id,
- override_output_files = True):
+ override_output_files = True, handle = None):
"""
Executes the specified python file in a separate subprocess.
Method returns only when the subprocess is finished.
@@ -59,13 +71,6 @@ class PythonExecutor:
override_output_files option defines whether stdout/stderr files will be
recreated or appended
"""
- if override_output_files: # Recreate files
- tmpout = open(tmpoutfile, 'w')
- tmperr = open(tmperrfile, 'w')
- else: # Append to files
- tmpout = open(tmpoutfile, 'a')
- tmperr = open(tmperrfile, 'a')
-
# need to remove this file for the following case:
# status call 1 does not write to file; call 2 writes to file;
# call 3 does not write to file, so contents are still call 2's result
@@ -77,45 +82,58 @@ class PythonExecutor:
script_params += [tmpstructedoutfile, logger_level, tmp_dir]
pythonCommand = self.python_command(script, script_params)
logger.info("Running command " + pprint.pformat(pythonCommand))
- process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr)
- # map task_id to pid
- callback(task_id, process.pid)
- logger.debug("Launching watchdog thread")
- self.event.clear()
- self.python_process_has_been_killed = False
- thread = Thread(target = self.python_watchdog_func, args = (process, timeout))
- thread.start()
- # Waiting for the process to be either finished or killed
- process.communicate()
- self.event.set()
- thread.join()
+ if(handle == None) :
+ tmpout, tmperr = self.open_subporcess_files(tmpoutfile, tmperrfile, override_output_files)
+
+ process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr)
+ # map task_id to pid
+ callback(task_id, process.pid)
+ logger.debug("Launching watchdog thread")
+ self.event.clear()
+ self.python_process_has_been_killed = False
+ thread = Thread(target = self.python_watchdog_func, args = (process, timeout))
+ thread.start()
+ # Waiting for the process to be either finished or killed
+ process.communicate()
+ self.event.set()
+ thread.join()
+ return self.prepare_process_result(process, tmpoutfile, tmperrfile, tmpstructedoutfile)
+ else:
+ holder = Holder(pythonCommand, tmpoutfile, tmperrfile, tmpstructedoutfile, handle)
+
+ background = BackgroundThread(holder, self)
+ background.start()
+ return {"exitcode": 777}
+
+ def prepare_process_result (self, process, tmpoutfile, tmperrfile, tmpstructedoutfile):
+ out, error, structured_out = self.read_result_from_files(tmpoutfile, tmperrfile, tmpstructedoutfile)
# Building results
- error = self.NO_ERROR
returncode = process.returncode
- out = open(tmpoutfile, 'r').read()
- error = open(tmperrfile, 'r').read()
+ if self.python_process_has_been_killed:
+ error = str(error) + "\n Python script has been killed due to timeout"
+ returncode = 999
+ result = self.condenseOutput(out, error, returncode, structured_out)
+ logger.info("Result: %s" % result)
+ return result
+
+ def read_result_from_files(self, out_path, err_path, structured_out_path):
+ out = open(out_path, 'r').read()
+ error = open(err_path, 'r').read()
try:
- with open(tmpstructedoutfile, 'r') as fp:
+ with open(structured_out_path, 'r') as fp:
structured_out = json.load(fp)
except Exception:
- if os.path.exists(tmpstructedoutfile):
- errMsg = 'Unable to read structured output from ' + tmpstructedoutfile
+ if os.path.exists(structured_out_path):
+ errMsg = 'Unable to read structured output from ' + structured_out_path
structured_out = {
'msg' : errMsg
}
logger.warn(structured_out)
else:
structured_out = {}
-
- if self.python_process_has_been_killed:
- error = str(error) + "\n Python script has been killed due to timeout"
- returncode = 999
- result = self.condenseOutput(out, error, returncode, structured_out)
- logger.info("Result: %s" % result)
- return result
-
-
+ return out, error, structured_out
+
def launch_python_subprocess(self, command, tmpout, tmperr):
"""
Creates subprocess with given parameters. This functionality was moved to separate method
@@ -124,7 +142,7 @@ class PythonExecutor:
return subprocess.Popen(command,
stdout=tmpout,
stderr=tmperr, close_fds=True)
-
+
def isSuccessfull(self, returncode):
return not self.python_process_has_been_killed and returncode == 0
@@ -153,3 +171,39 @@ class PythonExecutor:
shell.kill_process_with_children(python.pid)
self.python_process_has_been_killed = True
pass
+
+class Holder:
+ def __init__(self, command, out_file, err_file, structured_out_file, handle):
+ self.command = command
+ self.out_file = out_file
+ self.err_file = err_file
+ self.structured_out_file = structured_out_file
+ self.handle = handle
+
+class BackgroundThread(threading.Thread):
+ def __init__(self, holder, pythonExecutor):
+ threading.Thread.__init__(self)
+ self.holder = holder
+ self.pythonExecutor = pythonExecutor
+
+ def run(self):
+ process_out, process_err = self.pythonExecutor.open_subporcess_files(self.holder.out_file, self.holder.err_file, True)
+
+ logger.info("Starting process command %s" % self.holder.command)
+ process = self.pythonExecutor.launch_python_subprocess(self.holder.command, process_out, process_err)
+
+ logger.info("Process has been started. Pid = %s" % process.pid)
+
+ self.holder.handle.pid = process.pid
+ self.holder.handle.status = BackgroundCommandExecutionHandle.RUNNING_STATUS
+ self.holder.handle.on_background_command_started(self.holder.handle)
+
+ process.communicate()
+
+ self.holder.handle.exitCode = process.returncode
+ process_condenced_result = self.pythonExecutor.prepare_process_result(process, self.holder.out_file, self.holder.err_file, self.holder.structured_out_file)
+ logger.info("Calling callback with args %s" % process_condenced_result)
+ self.holder.handle.on_background_command_complete_callback(process_condenced_result, self.holder.handle)
+ logger.info("Exiting from thread for holder pid %s" % self.holder.handle.pid)
+
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index e06efe4..4447670 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -27,6 +27,7 @@ import os, errno, time, pprint, tempfile, threading, json
import StringIO
import sys
from threading import Thread
+import copy
from mock.mock import patch, MagicMock, call
from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
@@ -34,13 +35,11 @@ from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
from ambari_agent.PythonExecutor import PythonExecutor
from ambari_agent.CommandStatusDict import CommandStatusDict
from ambari_agent.ActualConfigHandler import ActualConfigHandler
+from FileCache import FileCache
class TestActionQueue(TestCase):
-
def setUp(self):
- out = StringIO.StringIO()
- sys.stdout = out
# save original open() method for later use
self.original_open = open
@@ -155,6 +154,49 @@ class TestActionQueue(TestCase):
'hostLevelParams': {}
}
+ background_command = {
+ 'commandType': 'BACKGROUND_EXECUTION_COMMAND',
+ 'role': 'NAMENODE',
+ 'roleCommand': 'CUSTOM_COMMAND',
+ 'commandId': '1-1',
+ 'taskId': 19,
+ 'clusterName': 'c1',
+ 'serviceName': 'HDFS',
+ 'configurations':{'global' : {}},
+ 'configurationTags':{'global' : { 'tag': 'v123' }},
+ 'hostLevelParams':{'custom_command': 'REBALANCE_HDFS'},
+ 'commandParams' : {
+ 'script_type' : 'PYTHON',
+ 'script' : 'script.py',
+ 'command_timeout' : '600',
+ 'jdk_location' : '.',
+ 'service_package_folder' : '.'
+ }
+ }
+ cancel_background_command = {
+ 'commandType': 'EXECUTION_COMMAND',
+ 'role': 'NAMENODE',
+ 'roleCommand': 'ACTIONEXECUTE',
+ 'commandId': '1-1',
+ 'taskId': 20,
+ 'clusterName': 'c1',
+ 'serviceName': 'HDFS',
+ 'configurations':{'global' : {}},
+ 'configurationTags':{'global' : {}},
+ 'hostLevelParams':{},
+ 'commandParams' : {
+ 'script_type' : 'PYTHON',
+ 'script' : 'cancel_background_task.py',
+ 'before_system_hook_function' : 'fetch_bg_pid_by_taskid',
+ 'jdk_location' : '.',
+ 'command_timeout' : '600',
+ 'service_package_folder' : '.',
+ 'cancel_policy': 'SIGKILL',
+ 'cancel_task_id': "19",
+ }
+ }
+
+
@patch.object(ActionQueue, "process_command")
@patch.object(Queue, "get")
@patch.object(CustomServiceOrchestrator, "__init__")
@@ -526,3 +568,170 @@ class TestActionQueue(TestCase):
actionQueue.join()
self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+ @patch.object(StackVersionsFileHandler, "read_stack_version")
+ @patch.object(CustomServiceOrchestrator, "runCommand")
+ @patch.object(CustomServiceOrchestrator, "__init__")
+ def test_execute_background_command(self, CustomServiceOrchestrator_mock,
+ runCommand_mock, read_stack_version_mock
+ ):
+ CustomServiceOrchestrator_mock.return_value = None
+ CustomServiceOrchestrator.runCommand.return_value = {'exitcode' : 0,
+ 'stdout': 'out-11',
+ 'stderr' : 'err-13'}
+
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
+
+ execute_command = copy.deepcopy(self.background_command)
+ actionQueue.put([execute_command])
+ actionQueue.processBackgroundQueueSafeEmpty();
+ actionQueue.processStatusCommandQueueSafeEmpty();
+
+ #assert that python execturor start
+ self.assertTrue(runCommand_mock.called)
+ runningCommand = actionQueue.commandStatuses.current_state.get(execute_command['taskId'])
+ self.assertTrue(runningCommand is not None)
+ self.assertEqual(runningCommand[1]['status'], ActionQueue.IN_PROGRESS_STATUS)
+
+ report = actionQueue.result()
+ self.assertEqual(len(report['reports']),1)
+
+
+
+ @patch.object(StackVersionsFileHandler, "read_stack_version")
+ @patch.object(CustomServiceOrchestrator, "resolve_script_path")
+ @patch.object(FileCache, "__init__")
+ def test_execute_python_executor(self, read_stack_version_mock, FileCache_mock, resolve_script_path_mock):
+ FileCache_mock.return_value = None
+
+
+ dummy_controller = MagicMock()
+ cfg = AmbariConfig().getConfig()
+ cfg.set('agent', 'tolerate_download_failures', 'true')
+ cfg.set('agent', 'prefix', '.')
+ cfg.set('agent', 'cache_dir', 'background_tasks')
+
+ actionQueue = ActionQueue(cfg, dummy_controller)
+ patch_output_file(actionQueue.customServiceOrchestrator.python_executor)
+ actionQueue.customServiceOrchestrator.dump_command_to_json = MagicMock()
+
+ result = {}
+ lock = threading.RLock()
+ complete_done = threading.Condition(lock)
+ start_done = threading.Condition(lock)
+
+ def command_started_w(handle):
+ with lock:
+ result['command_started'] = {'handle': copy.copy(handle), 'command_status' : actionQueue.commandStatuses.get_command_status(handle.command['taskId'])}
+ start_done.notifyAll()
+
+ def command_complete_w(process_condenced_result, handle):
+ with lock:
+ result['command_complete'] = {'condenced_result' : copy.copy(process_condenced_result),
+ 'handle' : copy.copy(handle),
+ 'command_status' : actionQueue.commandStatuses.get_command_status(handle.command['taskId'])
+ }
+ complete_done.notifyAll()
+
+ actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,None, command_complete_w)
+ actionQueue.on_background_command_started = wraped(actionQueue.on_background_command_started,None,command_started_w)
+ actionQueue.put([self.background_command])
+ actionQueue.processBackgroundQueueSafeEmpty();
+ actionQueue.processStatusCommandQueueSafeEmpty();
+
+ with lock:
+ start_done.wait(5)
+
+ self.assertTrue(result.has_key('command_started'), 'command started callback was not fired')
+ started_handle = result['command_started']['handle']
+ started_status = result['command_started']['command_status']
+
+ self.assertEqual(started_handle.pid, started_status['pid'])
+ self.assertTrue(started_handle.pid > 0, "PID was not assigned to handle")
+ self.assertEqual(started_status['status'], ActionQueue.IN_PROGRESS_STATUS)
+
+ complete_done.wait(2)
+
+ finished_handle = result['command_complete']['handle']
+ self.assertEqual(started_handle.pid, finished_handle.pid)
+ finished_status = result['command_complete']['command_status']
+ self.assertEqual(finished_status['status'], ActionQueue.COMPLETED_STATUS)
+ self.assertEqual(finished_status['stdout'], 'process_out')
+ self.assertEqual(finished_status['stderr'], 'process_err')
+ self.assertEqual(finished_status['exitCode'], 0)
+
+
+ runningCommand = actionQueue.commandStatuses.current_state.get(self.background_command['taskId'])
+ self.assertTrue(runningCommand is not None)
+
+ report = actionQueue.result()
+ self.assertEqual(len(report['reports']),1)
+ self.assertEqual(report['reports'][0]['stdout'],'process_out')
+# self.assertEqual(report['reports'][0]['structuredOut'],'{"a": "b."}')
+
+
+ @patch.object(StackVersionsFileHandler, "read_stack_version")
+ @patch.object(FileCache, "__init__")
+ def test_cancel_backgound_command(self, read_stack_version_mock, FileCache_mock):
+ FileCache_mock.return_value = None
+
+ dummy_controller = MagicMock()
+ cfg = AmbariConfig().getConfig()
+ cfg.set('agent', 'tolerate_download_failures', 'true')
+ cfg.set('agent', 'prefix', '.')
+ cfg.set('agent', 'cache_dir', 'background_tasks')
+
+ actionQueue = ActionQueue(cfg, dummy_controller)
+ patch_output_file(actionQueue.customServiceOrchestrator.python_executor)
+ actionQueue.customServiceOrchestrator.python_executor.prepare_process_result = MagicMock()
+ actionQueue.customServiceOrchestrator.dump_command_to_json = MagicMock()
+
+ lock = threading.RLock()
+ complete_done = threading.Condition(lock)
+
+ def command_complete_w(process_condenced_result, handle):
+ with lock:
+ complete_done.wait(4)
+
+ actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,None, command_complete_w)
+ execute_command = copy.deepcopy(self.background_command)
+ actionQueue.put([execute_command])
+ actionQueue.processBackgroundQueueSafeEmpty();
+
+ time.sleep(1)
+
+ actionQueue.process_command(self.cancel_background_command)
+ #TODO add assert
+
+ with lock:
+ complete_done.notifyAll()
+
+
+def patch_output_file(pythonExecutor):
+ def windows_py(command, tmpout, tmperr):
+ proc = MagicMock()
+ proc.pid = 33
+ proc.returncode = 0
+ with tmpout:
+ tmpout.write('process_out')
+ with tmperr:
+ tmperr.write('process_err')
+ return proc
+ def open_subporcess_files_win(fout, ferr, f):
+ return MagicMock(), MagicMock()
+ def read_result_from_files(out_path, err_path, structured_out_path):
+ return 'process_out', 'process_err', '{"a": "b."}'
+ pythonExecutor.launch_python_subprocess = windows_py
+ pythonExecutor.open_subporcess_files = open_subporcess_files_win
+ pythonExecutor.read_result_from_files = read_result_from_files
+
+def wraped(func, before = None, after = None):
+ def wrapper(*args, **kwargs):
+ if(before is not None):
+ before(*args, **kwargs)
+ ret = func(*args, **kwargs)
+ if(after is not None):
+ after(*args, **kwargs)
+ return ret
+ return wrapper
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
index d669cd2..a1e1c66 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
@@ -39,6 +39,7 @@ import sys
from AgentException import AgentException
from FileCache import FileCache
from LiveStatus import LiveStatus
+from BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
class TestCustomServiceOrchestrator(TestCase):
@@ -396,6 +397,39 @@ class TestCustomServiceOrchestrator(TestCase):
self.assertEqual(runCommand_mock.return_value, status)
+ @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
+ @patch.object(FileCache, "__init__")
+ @patch.object(FileCache, "get_custom_actions_base_dir")
+ def test_runCommand_background_action(self, get_custom_actions_base_dir_mock,
+ FileCache_mock,
+ dump_command_to_json_mock):
+ FileCache_mock.return_value = None
+ get_custom_actions_base_dir_mock.return_value = "some path"
+ _, script = tempfile.mkstemp()
+ command = {
+ 'role' : 'any',
+ 'commandParams': {
+ 'script_type': 'PYTHON',
+ 'script': 'some_custom_action.py',
+ 'command_timeout': '600',
+ 'jdk_location' : 'some_location'
+ },
+ 'taskId' : '13',
+ 'roleCommand': 'ACTIONEXECUTE',
+ 'commandType': 'BACKGROUND_EXECUTION_COMMAND',
+ '__handle' : BackgroundCommandExecutionHandle(None,13,MagicMock(), MagicMock())
+ }
+ dummy_controller = MagicMock()
+ orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
+
+ import TestActionQueue
+ TestActionQueue.patch_output_file(orchestrator.python_executor)
+ orchestrator.python_executor.condenseOutput = MagicMock()
+ orchestrator.dump_command_to_json = MagicMock()
+
+ ret = orchestrator.runCommand(command, "out.txt", "err.txt")
+ self.assertEqual(ret['exitcode'], 777)
+
def tearDown(self):
# enable stdout
sys.stdout = sys.__stdout__
http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-server/pom.xml b/ambari-server/pom.xml
index 22aa454..1914c03 100644
--- a/ambari-server/pom.xml
+++ b/ambari-server/pom.xml
@@ -135,6 +135,8 @@
<exclude>src/main/resources/db/serial</exclude>
<exclude>src/main/resources/db/index.txt</exclude>
<exclude>src/main/resources/stacks/HDP/2.1.GlusterFS/services/YARN/package/templates/exclude_hosts_list.j2</exclude>
+ <exclude>src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/balancer-emulator/balancer-err.log</exclude>
+ <exclude>src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/balancer-emulator/balancer.log</exclude>
<exclude>conf/unix/ca.config</exclude>
<exclude>conf/unix/krb5JAASLogin.conf</exclude>
<exclude>**/*.json</exclude>
http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 9e3f69c..cab891f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -31,18 +31,13 @@ import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
-import com.google.common.reflect.TypeToken;
-import com.google.inject.persist.UnitOfWork;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.Role;
import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.ServiceComponentHostNotFoundException;
import org.apache.ambari.server.ServiceComponentNotFoundException;
import org.apache.ambari.server.agent.ActionQueue;
+import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
import org.apache.ambari.server.agent.CancelCommand;
import org.apache.ambari.server.agent.CommandReport;
import org.apache.ambari.server.agent.ExecutionCommand;
@@ -65,6 +60,13 @@ import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.reflect.TypeToken;
+import com.google.inject.persist.UnitOfWork;
+
/**
@@ -193,7 +195,6 @@ class ActionScheduler implements Runnable {
processCancelledRequestsList();
Set<Long> runningRequestIds = new HashSet<Long>();
- Set<String> affectedHosts = new HashSet<String>();
List<Stage> stages = db.getStagesInProgress();
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduler wakes up");
@@ -207,6 +208,10 @@ class ActionScheduler implements Runnable {
return;
}
int i_stage = 0;
+
+
+ stages = filterParallelPerHostStages(stages);
+
for (Stage s : stages) {
// Check if we can process this stage in parallel with another stages
i_stage ++;
@@ -225,20 +230,7 @@ class ActionScheduler implements Runnable {
}
}
- List<String> stageHosts = s.getHosts();
- boolean conflict = false;
- for (String host : stageHosts) {
- if (affectedHosts.contains(host)) {
- conflict = true;
- break;
- }
- }
- if (conflict) {
- // Also we don't want to perform stages in parallel at the same hosts
- continue;
- } else {
- affectedHosts.addAll(stageHosts);
- }
+
// Commands that will be scheduled in current scheduler wakeup
List<ExecutionCommand> commandsToSchedule = new ArrayList<ExecutionCommand>();
@@ -354,6 +346,38 @@ class ActionScheduler implements Runnable {
}
}
+ /**
+ * Returns filtered list of stages following the rule:
+ * 1) remove stages that has the same host. Leave only first stage, the rest that have same host of any operation will be filtered
+ * 2) do not remove stages intersected by host if they have intersection by background command
+ * @param stages
+ * @return
+ */
+ private List<Stage> filterParallelPerHostStages(List<Stage> stages) {
+ List<Stage> retVal = new ArrayList<Stage>();
+ Set<String> affectedHosts = new HashSet<String>();
+ for(Stage s : stages){
+ for (String host : s.getHosts()) {
+ if (!affectedHosts.contains(host)) {
+ if(!isStageHasBackgroundCommandsOnly(s, host)){
+ affectedHosts.add(host);
+ }
+ retVal.add(s);
+ }
+ }
+ }
+ return retVal;
+ }
+
+ private boolean isStageHasBackgroundCommandsOnly(Stage s, String host) {
+ for (ExecutionCommandWrapper c : s.getExecutionCommands(host)) {
+ if(c.getExecutionCommand().getCommandType() != AgentCommandType.BACKGROUND_EXECUTION_COMMAND)
+ {
+ return false;
+ }
+ }
+ return true;
+ }
/**
* Executes internal ambari-server action
http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentCommand.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentCommand.java
index 8703320..54faf6a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentCommand.java
@@ -31,6 +31,7 @@ public abstract class AgentCommand {
public enum AgentCommandType {
EXECUTION_COMMAND,
+ BACKGROUND_EXECUTION_COMMAND,
STATUS_COMMAND,
CANCEL_COMMAND,
REGISTRATION_COMMAND
http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index f2b5433..fa633c1 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -563,6 +563,7 @@ public class HeartBeatHandler {
throw new AmbariException("Could not get jaxb string for command", e);
}
switch (ac.getCommandType()) {
+ case BACKGROUND_EXECUTION_COMMAND:
case EXECUTION_COMMAND: {
response.addExecutionCommand((ExecutionCommand) ac);
break;
http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
index 4f9a8a4..80af575 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
@@ -402,15 +402,7 @@ public class AmbariMetaInfo {
public boolean isValidServiceComponent(String stackName, String version,
String serviceName, String componentName) throws AmbariException {
ServiceInfo service = getServiceInfo(stackName, version, serviceName);
- if (service == null) {
- return false;
- }
- for (ComponentInfo compInfo : service.getComponents()) {
- if (compInfo.getName().equals(componentName)) {
- return true;
- }
- }
- return false;
+ return service != null && service.getComponentByName(componentName) != null;
}
/**
@@ -436,17 +428,12 @@ public class AmbariMetaInfo {
|| services.isEmpty()) {
return retService;
}
- boolean found = false;
for (Map.Entry<String, ServiceInfo> entry : services.entrySet()) {
- for (ComponentInfo compInfo : entry.getValue().getComponents()) {
- if (compInfo.getName().equals(componentName)) {
- retService = entry.getKey();
- found = true;
- break;
- }
- }
- if (found)
+ ComponentInfo vu = entry.getValue().getComponentByName(componentName);
+ if(vu != null){
+ retService = entry.getKey();
break;
+ }
}
return retService;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/api/util/StackExtensionHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/util/StackExtensionHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/api/util/StackExtensionHelper.java
index c702a45..65efa77 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/api/util/StackExtensionHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/util/StackExtensionHelper.java
@@ -254,8 +254,7 @@ public class StackExtensionHelper {
for (ComponentInfo childComponent : childService.getComponents()) {
if (!childComponent.isDeleted()) {
- ComponentInfo parentComponent = getComponent(parentService,
- childComponent.getName());
+ ComponentInfo parentComponent = parentService.getComponentByName(childComponent.getName());
if (parentComponent != null) { // If parent has similar component
ComponentInfo mergedComponent = mergeComponents(parentComponent,
childComponent);
@@ -278,17 +277,6 @@ public class StackExtensionHelper {
}
}
-
- private ComponentInfo getComponent(ServiceInfo service, String componentName) {
- for (ComponentInfo component : service.getComponents()) {
- if (component.getName().equals(componentName)) {
- return component;
- }
- }
- return null;
- }
-
-
ComponentInfo mergeComponents(ComponentInfo parent, ComponentInfo child) {
ComponentInfo result = new ComponentInfo(child); // cloning child
CommandScriptDefinition commandScript = child.getCommandScript();
http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
index cf628d9..339194f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
@@ -42,6 +42,7 @@ import org.apache.ambari.server.Role;
import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.actionmanager.HostRoleCommand;
import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
import org.apache.ambari.server.agent.ExecutionCommand;
import org.apache.ambari.server.agent.ExecutionCommand.KeyNames;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
@@ -53,6 +54,7 @@ import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.CommandScriptDefinition;
import org.apache.ambari.server.state.ComponentInfo;
import org.apache.ambari.server.state.ConfigHelper;
+import org.apache.ambari.server.state.CustomCommandDefinition;
import org.apache.ambari.server.state.Host;
import org.apache.ambari.server.state.HostComponentAdminState;
import org.apache.ambari.server.state.MaintenanceState;
@@ -236,7 +238,6 @@ public class AmbariCustomCommandExecutionHelper {
throw new AmbariException(message);
}
-
StackId stackId = cluster.getDesiredStackVersion();
AmbariMetaInfo ambariMetaInfo = managementController.getAmbariMetaInfo();
ServiceInfo serviceInfo = ambariMetaInfo.getServiceInfo
@@ -244,6 +245,12 @@ public class AmbariCustomCommandExecutionHelper {
StackInfo stackInfo = ambariMetaInfo.getStackInfo
(stackId.getStackName(), stackId.getStackVersion());
+ CustomCommandDefinition customCommandDefinition = null;
+ ComponentInfo ci = serviceInfo.getComponentByName(componentName);
+ if(ci != null){
+ customCommandDefinition = ci.getCustomCommandByName(commandName);
+ }
+
long nowTimestamp = System.currentTimeMillis();
for (String hostName : candidateHosts) {
@@ -271,6 +278,11 @@ public class AmbariCustomCommandExecutionHelper {
ExecutionCommand execCmd = stage.getExecutionCommandWrapper(hostName,
componentName).getExecutionCommand();
+ //set type background
+ if(customCommandDefinition != null && customCommandDefinition.isBackground()){
+ execCmd.setCommandType(AgentCommandType.BACKGROUND_EXECUTION_COMMAND);
+ }
+
execCmd.setConfigurations(configurations);
execCmd.setConfigurationAttributes(configurationAttributes);
execCmd.setConfigurationTags(configTags);
http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/state/ComponentInfo.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ComponentInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ComponentInfo.java
index f8b952c..172b1ea 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/ComponentInfo.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ComponentInfo.java
@@ -156,6 +156,14 @@ public class ComponentInfo {
}
return false;
}
+ public CustomCommandDefinition getCustomCommandByName(String commandName){
+ for(CustomCommandDefinition ccd : getCustomCommands()){
+ if (ccd.getName().equals(commandName)){
+ return ccd;
+ }
+ }
+ return null;
+ }
public List<DependencyInfo> getDependencies() {
return dependencies;
http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java
index 481245c..1161cc6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java
@@ -563,7 +563,7 @@ public class ConfigHelper {
ServiceInfo serviceInfo = ambariMetaInfo.getService(stackId.getStackName(),
stackId.getStackVersion(), sch.getServiceName());
- ComponentInfo componentInfo = getComponentInfo(serviceInfo,sch.getServiceComponentName());
+ ComponentInfo componentInfo = serviceInfo.getComponentByName(sch.getServiceComponentName());
// Configs are considered stale when:
// - desired type DOES NOT exist in actual
// --- desired type DOES NOT exist in stack: not_stale
@@ -621,15 +621,6 @@ public class ConfigHelper {
return stale;
}
- private ComponentInfo getComponentInfo(ServiceInfo serviceInfo, String componentName) {
- for(ComponentInfo componentInfo : serviceInfo.getComponents()) {
- if(componentInfo.getName().equals(componentName)){
- return componentInfo;
- }
- }
- return null;
- }
-
/**
* @return <code>true</code> if any service on the stack defines a property
* for the type.
http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/state/CustomCommandDefinition.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/CustomCommandDefinition.java b/ambari-server/src/main/java/org/apache/ambari/server/state/CustomCommandDefinition.java
index a26e7be..72eeb50 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/CustomCommandDefinition.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/CustomCommandDefinition.java
@@ -30,15 +30,20 @@ public class CustomCommandDefinition {
private String name;
private CommandScriptDefinition commandScript;
+ private boolean background = false;
public String getName() {
return name;
}
+
+ public boolean isBackground() {
+ return background;
+ }
public CommandScriptDefinition getCommandScript() {
return commandScript;
}
-
+
@Override
public boolean equals(Object obj) {
if (obj == null) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceInfo.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceInfo.java
index f2d9fe3..ecc5c11 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceInfo.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceInfo.java
@@ -180,7 +180,19 @@ public class ServiceInfo {
if (components == null) components = new ArrayList<ComponentInfo>();
return components;
}
-
+ /**
+ * Finds ComponentInfo by component name
+ * @param componentName
+ * @return ComponentInfo componentName or null
+ */
+ public ComponentInfo getComponentByName(String componentName){
+ for(ComponentInfo componentInfo : getComponents()) {
+ if(componentInfo.getName().equals(componentName)){
+ return componentInfo;
+ }
+ }
+ return null;
+ }
public boolean isClientOnlyService() {
if (components == null || components.isEmpty()) {
return false;
http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml b/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml
index 37ba394..b9600dd 100644
--- a/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml
+++ b/ambari-server/src/main/resources/custom_action_definitions/system_action_definitions.xml
@@ -20,16 +20,6 @@
<actionDefinitions>
<actionDefinition>
- <actionName>ambari_hdfs_rebalancer</actionName>
- <actionType>SYSTEM</actionType>
- <inputs>threshold,[principal],[keytab]</inputs>
- <targetService>HDFS</targetService>
- <targetComponent>NAMENODE</targetComponent>
- <defaultTimeout>600</defaultTimeout>
- <description>HDFS Rebalance</description>
- <targetType>ANY</targetType>
- </actionDefinition>
- <actionDefinition>
<actionName>nagios_update_ignore</actionName>
<actionType>SYSTEM</actionType>
<inputs>[nagios_ignore]</inputs>
@@ -59,4 +49,14 @@
<description>Validate if provided service config can be applied to specified hosts</description>
<targetType>ALL</targetType>
</actionDefinition>
+ <actionDefinition>
+ <actionName>cancel_background_task</actionName>
+ <actionType>SYSTEM</actionType>
+ <inputs></inputs>
+ <targetService></targetService>
+ <targetComponent></targetComponent>
+ <defaultTimeout>60</defaultTimeout>
+ <description>Cancel background task</description>
+ <targetType>ANY</targetType>
+ </actionDefinition>
</actionDefinitions>
http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/resources/custom_actions/ambari_hdfs_rebalancer.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/custom_actions/ambari_hdfs_rebalancer.py b/ambari-server/src/main/resources/custom_actions/ambari_hdfs_rebalancer.py
deleted file mode 100644
index fd1e5d1..0000000
--- a/ambari-server/src/main/resources/custom_actions/ambari_hdfs_rebalancer.py
+++ /dev/null
@@ -1,59 +0,0 @@
-#!/usr/bin/env python
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
-Ambari Agent
-
-"""
-
-from resource_management import *
-
-
-class HdfsRebalance(Script):
- def actionexecute(self, env):
- config = Script.get_config()
-
- hdfs_user = config['configurations']['global']['hdfs_user']
- conf_dir = "/etc/hadoop/conf"
-
- _authentication = config['configurations']['core-site']['hadoop.security.authentication']
- security_enabled = ( not is_empty(_authentication) and _authentication == 'kerberos')
-
- threshold = config['commandParams']['threshold']
-
- if security_enabled:
- kinit_path_local = functions.get_kinit_path(
- ["/usr/bin", "/usr/kerberos/bin", "/usr/sbin"])
- principal = config['commandParams']['principal']
- keytab = config['commandParams']['keytab']
- Execute(format("{kinit_path_local} -kt {keytab} {principal}"))
-
- ExecuteHadoop(format('balancer -threshold {threshold}'),
- user=hdfs_user,
- conf_dir=conf_dir,
- logoutput=True
- )
-
- structured_output_example = {
- 'result': 'Rebalancer completed.'
- }
-
- self.put_structured_out(structured_output_example)
-
-
-if __name__ == "__main__":
- HdfsRebalance().execute()
http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/resources/custom_actions/cancel_background_task.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/custom_actions/cancel_background_task.py b/ambari-server/src/main/resources/custom_actions/cancel_background_task.py
new file mode 100644
index 0000000..9f9b1ea
--- /dev/null
+++ b/ambari-server/src/main/resources/custom_actions/cancel_background_task.py
@@ -0,0 +1,41 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+from resource_management import Script
+from ambari_agent import shell
+
+class CancelBackgroundTaskCommand(Script):
+ def actionexecute(self, env):
+ config = Script.get_config()
+
+ cancel_command_pid = config['commandParams']['cancel_command_pid'] if config['commandParams'].has_key('cancel_command_pid') else None
+ cancel_task_id = config['commandParams']['cancel_task_id']
+ if cancel_command_pid == None:
+ print "Nothing to cancel: there is no any task running with given taskId = '%s'" % cancel_task_id
+ else:
+ cancel_policy = config['commandParams']['cancel_policy']
+ print "Send Kill to process pid = %s for task = %s with policy %s" % (cancel_command_pid, cancel_task_id, cancel_policy)
+
+ shell.kill_process_with_children(cancel_command_pid)
+ print "Process pid = %s for task = %s has been killed successfully" % (cancel_command_pid, cancel_task_id)
+
+if __name__ == "__main__":
+ CancelBackgroundTaskCommand().execute()
http://git-wip-us.apache.org/repos/asf/ambari/blob/cb662f49/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/metainfo.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/metainfo.xml b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/metainfo.xml
index 7ac5e34..3d30e07 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/metainfo.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/metainfo.xml
@@ -42,6 +42,14 @@
<timeout>600</timeout>
</commandScript>
</customCommand>
+ <customCommand>
+ <name>REBALANCEHDFS</name>
+ <background>true</background>
+ <commandScript>
+ <script>scripts/namenode.py</script>
+ <scriptType>PYTHON</scriptType>
+ </commandScript>
+ </customCommand>
</customCommands>
</component>