You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/05/19 10:22:43 UTC
[2/2] ambari git commit: AMBARI-21056. Run execution commands sent to
/user/commands (aonishuk)
AMBARI-21056. Run execution commands sent to /user/commands (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c4c2ec79
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c4c2ec79
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c4c2ec79
Branch: refs/heads/branch-3.0-perf
Commit: c4c2ec79b69a5474ef262ba90ba427baacd80ae7
Parents: 2eb7844
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Fri May 19 13:22:40 2017 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Fri May 19 13:22:40 2017 +0300
----------------------------------------------------------------------
ambari-agent/conf/unix/ambari-agent.ini | 2 +
.../src/main/python/ambari_agent/ActionQueue.py | 61 +-
.../main/python/ambari_agent/AmbariConfig.py | 1 +
.../python/ambari_agent/CommandStatusDict.py | 48 +-
.../ambari_agent/CommandStatusReporter.py | 54 +
.../ambari_agent/ComponentStatusExecutor.py | 1 +
.../src/main/python/ambari_agent/Constants.py | 1 +
.../ambari_agent/CustomServiceOrchestrator.py | 6 +-
.../main/python/ambari_agent/HeartbeatThread.py | 4 +-
.../python/ambari_agent/InitializerModule.py | 6 +
.../main/python/ambari_agent/PythonExecutor.py | 5 +-
.../listeners/CommandsEventListener.py | 54 +
.../listeners/ConfigurationEventListener.py | 4 +-
.../python/ambari_agent/listeners/__init__.py | 6 +-
.../src/main/python/ambari_agent/main.py | 6 +
.../ambari_agent/TestAgentStompResponses.py | 35 +-
.../dummy_files/stomp/execution_commands.json | 12 +-
.../resources/Ambari-DDL-AzureDB-CREATE.sql | 2175 ++++++++++++++++++
18 files changed, 2402 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/conf/unix/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini
index 441a01d..609f0fa 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -36,6 +36,8 @@ run_as_user=root
parallel_execution=0
alert_grace_period=5
status_command_timeout=5
+; 0 - don't report commands output periodically. Reduces bandwidth on big cluster
+command_reports_interval=5
alert_kinit_timeout=14400000
system_resource_overrides=/etc/resource_overrides
; memory_threshold_soft_mb=400
http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 1eda5c2..e9a3045 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -18,7 +18,6 @@ See the License for the specific language governing permissions and
limitations under the License.
'''
import Queue
-import multiprocessing
import logging
import traceback
@@ -28,11 +27,11 @@ import os
import ambari_simplejson as json
import time
import signal
+import copy
from AgentException import AgentException
from LiveStatus import LiveStatus
from ActualConfigHandler import ActualConfigHandler
-from CommandStatusDict import CommandStatusDict
from CustomServiceOrchestrator import CustomServiceOrchestrator
from ambari_agent.BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
from ambari_commons.str_utils import split_on_chunks
@@ -73,40 +72,34 @@ class ActionQueue(threading.Thread):
COMPLETED_STATUS = 'COMPLETED'
FAILED_STATUS = 'FAILED'
- def __init__(self, config, controller):
+ def __init__(self, initializer_module):
super(ActionQueue, self).__init__()
self.commandQueue = Queue.Queue()
- self.statusCommandResultQueue = multiprocessing.Queue() # this queue is filled by StatuCommandsExecutor.
self.backgroundCommandQueue = Queue.Queue()
- self.commandStatuses = CommandStatusDict(callback_action =
- self.status_update_callback)
- self.config = config
- self.controller = controller
+ self.commandStatuses = initializer_module.commandStatuses
+ self.configurations_cache = initializer_module.configurations_cache
+ self.config = initializer_module.ambariConfig
self.configTags = {}
- self._stop = threading.Event()
- self.tmpdir = config.get('agent', 'prefix')
- self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller)
- self.parallel_execution = config.get_parallel_exec_option()
+ self.stop_event = initializer_module.stop_event
+ self.tmpdir = self.config.get('agent', 'prefix')
+ self.customServiceOrchestrator = CustomServiceOrchestrator(self.config)
+ self.parallel_execution = self.config.get_parallel_exec_option()
if self.parallel_execution == 1:
logger.info("Parallel execution is enabled, will execute agent commands in parallel")
self.lock = threading.Lock()
- def stop(self):
- self._stop.set()
-
- def stopped(self):
- return self._stop.isSet()
-
- def put_status(self, commands):
- self.controller.statusCommandsExecutor.put_commands(commands)
-
def put(self, commands):
for command in commands:
if not command.has_key('serviceName'):
command['serviceName'] = "null"
if not command.has_key('clusterName'):
command['clusterName'] = 'null'
-
+
+ if command.has_key('clusterId'):
+ cluster_id = command['clusterId']
+ # TODO STOMP: what if has no configs yet?
+ if cluster_id != 'null':
+ command['configurations'] = dict(self.configurations_cache[str(cluster_id)])
logger.info("Adding " + command['commandType'] + " for role " + \
command['role'] + " for service " + \
command['serviceName'] + " of cluster " + \
@@ -144,9 +137,8 @@ class ActionQueue(threading.Thread):
def run(self):
try:
- while not self.stopped():
+ while not self.stop_event.is_set():
self.processBackgroundQueueSafeEmpty()
- self.controller.get_status_commands_executor().process_results() # process status commands
try:
if self.parallel_execution == 0:
command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
@@ -154,7 +146,7 @@ class ActionQueue(threading.Thread):
else:
# If parallel execution is enabled, just kick off all available
# commands using separate threads
- while (True):
+ while not self.stop_event.is_set():
command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
# If command is not retry_enabled then do not start them in parallel
# checking just one command is enough as all commands for a stage is sent
@@ -203,12 +195,14 @@ class ActionQueue(threading.Thread):
try:
if commandType in [self.EXECUTION_COMMAND, self.BACKGROUND_EXECUTION_COMMAND, self.AUTO_EXECUTION_COMMAND]:
try:
- if self.controller.recovery_manager.enabled():
- self.controller.recovery_manager.start_execution_command()
+ # TODO STOMP: fix recovery manager for execution commands
+ #if self.controller.recovery_manager.enabled():
+ # self.controller.recovery_manager.start_execution_command()
self.execute_command(command)
finally:
- if self.controller.recovery_manager.enabled():
- self.controller.recovery_manager.stop_execution_command()
+ pass
+ #if self.controller.recovery_manager.enabled():
+ # self.controller.recovery_manager.stop_execution_command()
else:
logger.error("Unrecognized command " + pprint.pformat(command))
except Exception:
@@ -380,6 +374,8 @@ class ActionQueue(threading.Thread):
# let recovery manager know the current state
if status == self.COMPLETED_STATUS:
+ # TODO STOMP:fix recovery_manager
+ """
if self.controller.recovery_manager.enabled() and command.has_key('roleCommand') \
and self.controller.recovery_manager.configured_for_recovery(command['role']):
if command['roleCommand'] == self.ROLE_COMMAND_START:
@@ -400,7 +396,7 @@ class ActionQueue(threading.Thread):
self.controller.recovery_manager.update_config_staleness(command['role'], False)
logger.info("After EXECUTION_COMMAND (RESTART), current state of " + command['role'] + " to " +
self.controller.recovery_manager.get_current_status(command['role']) )
- pass
+ """
# let ambari know that configuration tags were applied
configHandler = ActualConfigHandler(self.config, self.configTags)
@@ -437,6 +433,8 @@ class ActionQueue(threading.Thread):
roleResult['configurationTags'] = configHandler.read_actual_component(
command['role'])
elif status == self.FAILED_STATUS:
+ # TODO STOMP: recovery manager
+ """
if self.controller.recovery_manager.enabled() and command.has_key('roleCommand') \
and self.controller.recovery_manager.configured_for_recovery(command['role']):
if command['roleCommand'] == self.ROLE_COMMAND_INSTALL:
@@ -444,6 +442,7 @@ class ActionQueue(threading.Thread):
logger.info("After EXECUTION_COMMAND (INSTALL), with taskId=" + str(command['taskId']) +
", current state of " + command['role'] + " to " +
self.controller.recovery_manager.get_current_status(command['role']))
+ """
self.commandStatuses.put_command_status(command, roleResult)
@@ -504,6 +503,7 @@ class ActionQueue(threading.Thread):
'''
Executes commands of type STATUS_COMMAND
'''
+ # TODO STOMP: review if we need to run this with new status commands
try:
command, component_status_result = result
cluster = command['clusterName']
@@ -515,6 +515,7 @@ class ActionQueue(threading.Thread):
else:
globalConfig = {}
+ # TODO STOMP: check why we need this
if not Script.config :
logger.debug('Setting Script.config to last status command configuration')
Script.config = command
http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
index fe48870..3d480ca 100644
--- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
@@ -46,6 +46,7 @@ ping_port=8670
cache_dir={ps}tmp
parallel_execution=0
system_resource_overrides={ps}etc{ps}resource_overrides
+tolerate_download_failures=false
[services]
http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index 7a97f3f..bb0cea3 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -18,12 +18,14 @@ See the License for the specific language governing permissions and
limitations under the License.
'''
-import ambari_simplejson as json
import logging
import threading
import copy
+import json
from Grep import Grep
+from ambari_agent import Constants
+
logger = logging.getLogger()
class CommandStatusDict():
@@ -34,46 +36,29 @@ class CommandStatusDict():
task_id -> (command, cmd_report)
"""
- def __init__(self, callback_action):
+ def __init__(self, initializer_module):
"""
callback_action is called every time when status of some command is
updated
"""
self.current_state = {} # Contains all statuses
- self.callback_action = callback_action
self.lock = threading.RLock()
+ self.initializer_module = initializer_module
def put_command_status(self, command, new_report):
"""
Stores new version of report for command (replaces previous)
"""
- if 'taskId' in command:
- key = command['taskId']
- status_command = False
- else: # Status command reports has no task id
- key = id(command)
- status_command = True
+ key = command['taskId']
with self.lock: # Synchronized
self.current_state[key] = (command, new_report)
- if not status_command:
- self.callback_action()
- def update_command_status(self, command, delta):
- """
- Updates status of command without replacing (overwrites with delta value)
- """
- if 'taskId' in command:
- key = command['taskId']
- status_command = False
- else: # Status command reports has no task id
- key = id(command)
- status_command = True
- with self.lock: # Synchronized
- self.current_state[key][1].update(delta)
- if not status_command:
- self.callback_action()
-
+ self.force_update_to_server([new_report])
+
+ def force_update_to_server(self, reports):
+ self.initializer_module.connection.send(body=json.dumps(reports), destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT)
+
def get_command_status(self, taskId):
with self.lock:
c = copy.copy(self.current_state[taskId][1])
@@ -88,7 +73,6 @@ class CommandStatusDict():
from ActionQueue import ActionQueue
with self.lock: # Synchronized
resultReports = []
- resultComponentStatus = []
for key, item in self.current_state.items():
command = item[0]
report = item[1]
@@ -100,19 +84,11 @@ class CommandStatusDict():
else:
in_progress_report = self.generate_in_progress_report(command, report)
resultReports.append(in_progress_report)
- elif command ['commandType'] == ActionQueue.STATUS_COMMAND:
- resultComponentStatus.append(report)
- # Component status is useful once, removing it
- del self.current_state[key]
elif command ['commandType'] in [ActionQueue.AUTO_EXECUTION_COMMAND]:
logger.debug("AUTO_EXECUTION_COMMAND task deleted " + str(command['commandId']))
del self.current_state[key]
pass
- result = {
- 'reports': resultReports,
- 'componentStatus': resultComponentStatus
- }
- return result
+ return resultReports
def generate_in_progress_report(self, command, report):
http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
new file mode 100644
index 0000000..acee3b1
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import json
+import logging
+import threading
+
+from ambari_agent import Constants
+
+logger = logging.getLogger(__name__)
+
+class CommandStatusReporter(threading.Thread):
+ def __init__(self, initializer_module):
+ self.initializer_module = initializer_module
+ self.commandStatuses = initializer_module.commandStatuses
+ self.stop_event = initializer_module.stop_event
+ self.command_reports_interval = initializer_module.command_reports_interval
+ threading.Thread.__init__(self)
+
+ def run(self):
+ """
+ Run an endless loop which reports all the commands results (IN_PROGRESS, FAILED, COMPLETE) every self.command_reports_interval seconds.
+ """
+ if self.command_reports_interval == 0:
+ return
+
+ while not self.stop_event.is_set():
+ try:
+ # TODO STOMP: what if not registered?
+ report = self.commandStatuses.generate_report()
+ if report:
+ self.initializer_module.connection.send(body=json.dumps(report), destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT)
+ self.stop_event.wait(self.command_reports_interval)
+ except:
+ logger.exception("Exception in CommandStatusReporter. Re-running it")
+ pass
+ logger.info("CommandStatusReporter has successfully finished")
http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index a3798c6..1f6a7dc 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -88,6 +88,7 @@ class ComponentStatusExecutor(threading.Thread):
logging.info("Status for {0} has changed to {1}".format(component_name, status))
cluster_reports[cluster_id].append(result)
+ # TODO STOMP: what if not registered?
self.send_updates_to_server(cluster_reports)
self.stop_event.wait(Constants.STATUS_COMMANDS_PACK_INTERVAL_SECONDS)
except:
http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/Constants.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py
index 3fbb485..6a054cc 100644
--- a/ambari-agent/src/main/python/ambari_agent/Constants.py
+++ b/ambari-agent/src/main/python/ambari_agent/Constants.py
@@ -28,6 +28,7 @@ SERVER_RESPONSES_TOPIC = '/user/'
TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC, COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC]
COMPONENT_STATUS_REPORTS_ENDPOINT = '/reports/component_status'
+COMMANDS_STATUS_REPORTS_ENDPOINT = '/reports/commands_status'
HEARTBEAT_ENDPOINT = '/heartbeat'
REGISTRATION_ENDPOINT = '/register'
http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 8b8a8f9..656e9a1 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -78,7 +78,7 @@ class CustomServiceOrchestrator():
# Property name for credential store class path
CREDENTIAL_STORE_CLASS_PATH_NAME = 'credentialStoreClassPath'
- def __init__(self, config, controller):
+ def __init__(self, config):
self.config = config
self.tmp_dir = config.get('agent', 'prefix')
self.force_https_protocol = config.get_force_https_protocol()
@@ -89,8 +89,8 @@ class CustomServiceOrchestrator():
self.status_commands_stderr = os.path.join(self.tmp_dir,
'status_command_stderr.txt')
self.public_fqdn = hostname.public_hostname(config)
- # cache reset will be called on every agent registration
- controller.registration_listeners.append(self.file_cache.reset)
+ # TODO STOMP: cache reset should be called on every agent registration
+ #controller.registration_listeners.append(self.file_cache.reset)
# Construct the hadoop credential lib JARs path
self.credential_shell_lib_path = os.path.join(config.get('security', 'credential_lib_dir',
http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index 57748a0..70fe7e7 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -29,6 +29,7 @@ from ambari_agent.listeners.ServerResponsesListener import ServerResponsesListen
from ambari_agent.listeners.TopologyEventListener import TopologyEventListener
from ambari_agent.listeners.ConfigurationEventListener import ConfigurationEventListener
from ambari_agent.listeners.MetadataEventListener import MetadataEventListener
+from ambari_agent.listeners.CommandsEventListener import CommandsEventListener
HEARTBEAT_INTERVAL = 10
@@ -49,10 +50,11 @@ class HeartbeatThread(threading.Thread):
# listeners
self.server_responses_listener = ServerResponsesListener()
+ self.commands_events_listener = CommandsEventListener(initializer_module.action_queue)
self.metadata_events_listener = MetadataEventListener(initializer_module.metadata_cache)
self.topology_events_listener = TopologyEventListener(initializer_module.topology_cache)
self.configuration_events_listener = ConfigurationEventListener(initializer_module.configurations_cache)
- self.listeners = [self.server_responses_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener]
+ self.listeners = [self.server_responses_listener, self.commands_events_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener]
def run(self):
"""
http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index e1b4ed7..c36bd68 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -28,6 +28,8 @@ from ambari_agent.ClusterTopologyCache import ClusterTopologyCache
from ambari_agent.ClusterMetadataCache import ClusterMetadataCache
from ambari_agent.Utils import lazy_property
from ambari_agent.security import AmbariStompConnection
+from ambari_agent.ActionQueue import ActionQueue
+from ambari_agent.CommandStatusDict import CommandStatusDict
logger = logging.getLogger()
@@ -52,6 +54,7 @@ class InitializerModule:
self.secured_url_port = self.ambariConfig.get('server', 'secured_url_port')
self.cache_dir = self.ambariConfig.get('agent', 'cache_dir', default='/var/lib/ambari-agent/cache')
+ self.command_reports_interval = int(self.ambariConfig.get('agent', 'command_reports_interval', default='5'))
self.cluster_cache_dir = os.path.join(self.cache_dir, FileCache.CLUSTER_CACHE_DIRECTORY)
def init(self):
@@ -64,6 +67,9 @@ class InitializerModule:
self.topology_cache = ClusterTopologyCache(self.cluster_cache_dir)
self.configurations_cache = ClusterConfigurationCache(self.cluster_cache_dir)
+ self.commandStatuses = CommandStatusDict(self)
+ self.action_queue = ActionQueue(self)
+
@lazy_property
def connection(self):
"""
http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
index ea6f895..6008f39 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
@@ -127,8 +127,9 @@ class PythonExecutor(object):
"""
Log some useful information after task failure.
"""
- logger.info("Command " + pprint.pformat(pythonCommand) + " failed with exitcode=" + str(result['exitcode']))
- log_process_information(logger)
+ pass
+ #logger.info("Command " + pprint.pformat(pythonCommand) + " failed with exitcode=" + str(result['exitcode']))
+ #log_process_information(logger)
def prepare_process_result(self, returncode, tmpoutfile, tmperrfile, tmpstructedoutfile, timeout=None):
out, error, structured_out = self.read_result_from_files(tmpoutfile, tmperrfile, tmpstructedoutfile)
http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
new file mode 100644
index 0000000..b851443
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import ambari_stomp
+
+from ambari_agent.listeners import EventListener
+from ambari_agent import Constants
+
+logger = logging.getLogger(__name__)
+
+class CommandsEventListener(EventListener):
+ """
+ Listener of Constants.CONFIGURATIONS_TOPIC events from server.
+ """
+ def __init__(self, action_queue):
+ self.action_queue = action_queue
+
+ def on_event(self, headers, message):
+ """
+ Is triggered when an event to Constants.COMMANDS_TOPIC topic is received from server.
+
+ @param headers: headers dictionary
+ @param message: message payload dictionary
+ """
+ commands = []
+ for cluster_id in message.keys():
+ cluster_dict = message[cluster_id]
+ host_level_params = cluster_dict['hostLevelParams']
+ for command in cluster_dict['commands']:
+ command['hostLevelParams'] = host_level_params
+ commands.append(command)
+
+ self.action_queue.put(commands)
+
+ def get_handled_path(self):
+ return Constants.COMMANDS_TOPIC
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
index 722ec3c..20b42e6 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
@@ -31,7 +31,7 @@ class ConfigurationEventListener(EventListener):
Listener of Constants.CONFIGURATIONS_TOPIC events from server.
"""
def __init__(self, configuration_cache):
- self.topology_cache = configuration_cache
+ self.configuration_cache = configuration_cache
def on_event(self, headers, message):
"""
@@ -40,7 +40,7 @@ class ConfigurationEventListener(EventListener):
@param headers: headers dictionary
@param message: message payload dictionary
"""
- self.topology_cache.update_cache(message)
+ self.configuration_cache.update_cache(message)
def get_handled_path(self):
return Constants.CONFIGURATIONS_TOPIC
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
index 2b7e9bc..45b38ed 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
@@ -48,8 +48,10 @@ class EventListener(ambari_stomp.ConnectionListener):
logger.info("Received event from {0}".format(destination))
logger.debug("Received event from {0}: headers={1} ; message={2}".format(destination, headers, message))
-
- self.on_event(headers, message_json)
+ try:
+ self.on_event(headers, message_json)
+ except:
+ logger.exception("Exception while handing event from {0}: headers={1} ; message={2}".format(destination, headers, message))
def on_event(self, headers, message):
"""
http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
index 29eb926..72d6c70 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -112,6 +112,7 @@ from resource_management.core.logger import Logger
from ambari_agent import HeartbeatThread
from ambari_agent.InitializerModule import InitializerModule
from ambari_agent.ComponentStatusExecutor import ComponentStatusExecutor
+from ambari_agent.CommandStatusReporter import CommandStatusReporter
logger = logging.getLogger()
alerts_logger = logging.getLogger('ambari_alerts')
@@ -361,6 +362,11 @@ def run_threads():
component_status_executor = ComponentStatusExecutor(initializer_module)
component_status_executor.start()
+ command_status_reporter = CommandStatusReporter(initializer_module)
+ command_status_reporter.start()
+
+ initializer_module.action_queue.start()
+
while not initializer_module.stop_event.is_set():
time.sleep(0.1)
http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
index cab8fe1..9d59222 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -29,19 +29,29 @@ from BaseStompServerTestCase import BaseStompServerTestCase
from ambari_agent import HeartbeatThread
from ambari_agent.InitializerModule import InitializerModule
from ambari_agent.ComponentStatusExecutor import ComponentStatusExecutor
+from ambari_agent.CommandStatusReporter import CommandStatusReporter
+from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
from mock.mock import MagicMock, patch
class TestAgentStompResponses(BaseStompServerTestCase):
- def test_mock_server_can_start(self):
+ @patch.object(CustomServiceOrchestrator, "runCommand")
+ def test_mock_server_can_start(self, runCommand_mock):
+ runCommand_mock.return_value = {'stdout':'...', 'stderr':'...', 'structuredOut' : '{}', 'exitcode':1}
self.init_stdout_logger()
self.remove(['/tmp/cluster_cache/configurations.json', '/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json'])
+ if not os.path.exists("/tmp/ambari-agent"):
+ os.mkdir("/tmp/ambari-agent")
+
initializer_module = InitializerModule()
heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module)
heartbeat_thread.start()
+ action_queue = initializer_module.action_queue
+ action_queue.start()
+
connect_frame = self.server.frames_queue.get()
users_subscribe_frame = self.server.frames_queue.get()
commands_subscribe_frame = self.server.frames_queue.get()
@@ -53,6 +63,9 @@ class TestAgentStompResponses(BaseStompServerTestCase):
component_status_executor = ComponentStatusExecutor(initializer_module)
component_status_executor.start()
+ command_status_reporter = CommandStatusReporter(initializer_module)
+ command_status_reporter.start()
+
status_reports_frame = self.server.frames_queue.get()
# server sends registration response
@@ -72,6 +85,12 @@ class TestAgentStompResponses(BaseStompServerTestCase):
self.server.topic_manager.send(f)
heartbeat_frame = self.server.frames_queue.get()
+ dn_status_in_progress_frame = json.loads(self.server.frames_queue.get().body)
+ dn_status_failed_frame = json.loads(self.server.frames_queue.get().body)
+ zk_status_in_progress_frame = json.loads(self.server.frames_queue.get().body)
+ zk_status_failed_frame = json.loads(self.server.frames_queue.get().body)
+ action_status_in_progress_frame = json.loads(self.server.frames_queue.get().body)
+ action_status_failed_frame = json.loads(self.server.frames_queue.get().body)
initializer_module.stop_event.set()
f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '2'}, body=json.dumps({'heartbeat-response':'true'}))
@@ -79,10 +98,16 @@ class TestAgentStompResponses(BaseStompServerTestCase):
heartbeat_thread.join()
component_status_executor.join()
+ command_status_reporter.join()
+ action_queue.join()
self.assertEquals(initializer_module.topology_cache['0']['hosts'][0]['hostname'], 'c6401.ambari.apache.org')
self.assertEquals(initializer_module.metadata_cache['0']['status_commands_to_run'], ('STATUS',))
self.assertEquals(initializer_module.configurations_cache['0']['configurations']['zoo.cfg']['clientPort'], '2181')
+ self.assertEquals(dn_status_in_progress_frame[0]['roleCommand'], 'START')
+ self.assertEquals(dn_status_in_progress_frame[0]['role'], 'DATANODE')
+ self.assertEquals(dn_status_in_progress_frame[0]['status'], 'IN_PROGRESS')
+ self.assertEquals(dn_status_failed_frame[0]['status'], 'FAILED')
"""
============================================================================================
@@ -95,6 +120,9 @@ class TestAgentStompResponses(BaseStompServerTestCase):
heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module)
heartbeat_thread.start()
+ action_queue = initializer_module.action_queue
+ action_queue.start()
+
connect_frame = self.server.frames_queue.get()
users_subscribe_frame = self.server.frames_queue.get()
commands_subscribe_frame = self.server.frames_queue.get()
@@ -107,6 +135,9 @@ class TestAgentStompResponses(BaseStompServerTestCase):
component_status_executor = ComponentStatusExecutor(initializer_module)
component_status_executor.start()
+ command_status_reporter = CommandStatusReporter(initializer_module)
+ command_status_reporter.start()
+
status_reports_frame = self.server.frames_queue.get()
self.assertEquals(clusters_hashes['metadata_hash'], '21724f6ffa7aff0fe91a0c0c5b765dba')
@@ -125,6 +156,8 @@ class TestAgentStompResponses(BaseStompServerTestCase):
heartbeat_thread.join()
component_status_executor.join()
+ command_status_reporter.join()
+ action_queue.join()
def remove(self, filepathes):
for filepath in filepathes:
http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
index bf54b97..525af5c 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
@@ -32,10 +32,13 @@
{
"requestId":5,
"taskId":9,
+ "commandId":1,
"serviceName":"HDFS",
"role":"DATANODE",
"commandType":"EXECUTION_COMMAND",
"roleCommand":"START",
+ "clusterName": "c1",
+ "clusterId": 0,
"configuration_credentials":{
},
@@ -45,7 +48,7 @@
"script":"scripts/datanode.py",
"phase":"INITIAL_START",
"max_duration_for_retries":"600",
- "command_retry_enabled":"true",
+ "command_retry_enabled":"false",
"command_timeout":"1200",
"refresh_topology":"True",
"script_type":"PYTHON"
@@ -54,10 +57,13 @@
{
"requestId":6,
"taskId":9,
+ "commandId":0,
+ "clusterId": "null",
"serviceName":"ZOOKEEPER",
"role":"ZOOKEEPER_SERVER",
"commandType":"EXECUTION_COMMAND",
"roleCommand":"START",
+ "clusterName": "c1",
"configuration_credentials":{
},
@@ -67,7 +73,7 @@
"script":"scripts/datanode.py",
"phase":"INITIAL_START",
"max_duration_for_retries":"600",
- "command_retry_enabled":"true",
+ "command_retry_enabled":"false",
"command_timeout":"1200",
"refresh_topology":"True",
"script_type":"PYTHON"
@@ -88,6 +94,8 @@
"role":"check_host",
"commandType":"EXECUTION_COMMAND",
"taskId":2,
+ "commandId":1,
+ "clusterId": "null",
"commandParams":{
"script":"check_host.py",
"check_execute_list":"host_resolution_check",