You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/06/01 12:05:27 UTC
ambari git commit: AMBARI-21165. Register with server and changes to
events format and handle graceful stop or threads (aonishuk)
Repository: ambari
Updated Branches:
refs/heads/branch-3.0-perf 6303e9358 -> 5d0962421
AMBARI-21165. Register with server and changes to events format and handle graceful stop or threads (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5d096242
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5d096242
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5d096242
Branch: refs/heads/branch-3.0-perf
Commit: 5d0962421397d64b5e7a383eb23fb6ae22028b0b
Parents: 6303e93
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Thu Jun 1 15:05:03 2017 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Thu Jun 1 15:05:03 2017 +0300
----------------------------------------------------------------------
.../src/main/python/ambari_agent/ActionQueue.py | 9 +-
.../python/ambari_agent/ClusterTopologyCache.py | 36 +++-
.../python/ambari_agent/CommandStatusDict.py | 12 +-
.../ambari_agent/CommandStatusReporter.py | 14 +-
.../ambari_agent/ComponentStatusExecutor.py | 8 +-
.../src/main/python/ambari_agent/Constants.py | 4 +-
.../python/ambari_agent/HeartbeatHandlers.py | 12 +-
.../main/python/ambari_agent/HeartbeatThread.py | 71 +++++--
.../src/main/python/ambari_agent/NetUtil.py | 23 +--
.../src/main/python/ambari_agent/Register.py | 20 +-
.../src/main/python/ambari_agent/Utils.py | 28 ++-
.../listeners/CommandsEventListener.py | 7 +-
.../python/ambari_agent/listeners/__init__.py | 4 +-
.../src/main/python/ambari_agent/main.py | 30 +--
.../ambari_agent/BaseStompServerTestCase.py | 14 +-
.../ambari_agent/TestAgentStompResponses.py | 62 ++++---
.../dummy_files/stomp/execution_commands.json | 186 ++++++++-----------
.../stomp/registration_response.json | 2 +-
18 files changed, 293 insertions(+), 249 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index e9a3045..dbd9f4c 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -364,9 +364,10 @@ class ActionQueue(threading.Thread):
roleResult['stderr'] = 'None'
# let ambari know name of custom command
+ """
if command['hostLevelParams'].has_key('custom_command'):
roleResult['customCommand'] = command['hostLevelParams']['custom_command']
-
+ """
if 'structuredOut' in commandresult:
roleResult['structuredOut'] = str(json.dumps(commandresult['structuredOut']))
else:
@@ -566,12 +567,6 @@ class ActionQueue(threading.Thread):
logger.warn(err)
pass
-
- # Store action result to agent response queue
- def result(self):
- return self.commandStatuses.generate_report()
-
-
def status_update_callback(self):
"""
Actions that are executed every time when command status changes
http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
index 1102cd1..5810e67 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
@@ -42,20 +42,35 @@ class ClusterTopologyCache(ClusterCache):
return 'topology'
@staticmethod
- def find_host_by_id(host_dicts, cluster_id, host_id):
+ def find_host_by_id(host_dicts, host_id):
+ """
+ Find host by id in list of host dictionaries.
+ """
for host_dict in host_dicts:
if host_dict['hostId'] == host_id:
return host_dict
return None
@staticmethod
- def find_component(component_dicts, cluster_id, service_name, component_name):
+ def find_component(component_dicts, service_name, component_name):
+ """
+ Find component by service_name and component_name in list of component dictionaries.
+ """
for component_dict in component_dicts:
if component_dict['serviceName'] == service_name and component_dict['componentName'] == component_name:
return component_dict
return None
def cache_update(self, cache_update):
+ """
+ Handle event of update of topology.
+
+ Possible scenarios are:
+ - add new host
+ - update existing host information by hostId (e.g. rack name)
+ - add new component
+ - update component information by service_name and component_name
+ """
mutable_dict = self._get_mutable_copy()
for cluster_id, cluster_updates_dict in cache_update.iteritems():
@@ -66,7 +81,7 @@ class ClusterTopologyCache(ClusterCache):
if 'hosts' in cluster_updates_dict:
hosts_mutable_list = mutable_dict[cluster_id]['hosts']
for host_updates_dict in cluster_updates_dict['hosts']:
- host_mutable_dict = ClusterTopologyCache.find_host_by_id(hosts_mutable_list, cluster_id, host_updates_dict['hostId'])
+ host_mutable_dict = ClusterTopologyCache.find_host_by_id(hosts_mutable_list, host_updates_dict['hostId'])
if host_mutable_dict is not None:
host_mutable_dict.update(host_updates_dict)
else:
@@ -75,7 +90,7 @@ class ClusterTopologyCache(ClusterCache):
if 'components' in cluster_updates_dict:
components_mutable_list = mutable_dict[cluster_id]['components']
for component_updates_dict in cluster_updates_dict['components']:
- component_mutable_dict = ClusterTopologyCache.find_component(components_mutable_list, cluster_id, component_updates_dict['serviceName'], component_updates_dict['componentName'])
+ component_mutable_dict = ClusterTopologyCache.find_component(components_mutable_list, component_updates_dict['serviceName'], component_updates_dict['componentName'])
if component_mutable_dict is not None:
component_updates_dict['hostIds'] += component_mutable_dict['hostIds']
component_updates_dict['hostIds'] = list(set(component_updates_dict['hostIds']))
@@ -86,6 +101,15 @@ class ClusterTopologyCache(ClusterCache):
self.rewrite_cache(mutable_dict)
def cache_delete(self, cache_update):
+ """
+ Handle event of delete on topology.
+
+ Possible scenarios are:
+ - delete host
+ - delete component
+ - delete component host
+ - delete cluster
+ """
mutable_dict = self._get_mutable_copy()
clusters_ids_to_delete = []
@@ -97,7 +121,7 @@ class ClusterTopologyCache(ClusterCache):
if 'hosts' in cluster_updates_dict:
hosts_mutable_list = mutable_dict[cluster_id]['hosts']
for host_updates_dict in cluster_updates_dict['hosts']:
- host_to_delete = ClusterTopologyCache.find_host_by_id(hosts_mutable_list, cluster_id, host_updates_dict['hostId'])
+ host_to_delete = ClusterTopologyCache.find_host_by_id(hosts_mutable_list, host_updates_dict['hostId'])
if host_to_delete is not None:
mutable_dict[cluster_id]['hosts'] = [host_dict for host_dict in hosts_mutable_list if host_dict != host_to_delete]
else:
@@ -106,7 +130,7 @@ class ClusterTopologyCache(ClusterCache):
if 'components' in cluster_updates_dict:
components_mutable_list = mutable_dict[cluster_id]['components']
for component_updates_dict in cluster_updates_dict['components']:
- component_mutable_dict = ClusterTopologyCache.find_component(components_mutable_list, cluster_id, component_updates_dict['serviceName'], component_updates_dict['componentName'])
+ component_mutable_dict = ClusterTopologyCache.find_component(components_mutable_list, component_updates_dict['serviceName'], component_updates_dict['componentName'])
if 'hostIds' in component_mutable_dict:
exclude_host_ids = component_updates_dict['hostIds']
component_mutable_dict['hostIds'] = [host_id for host_id in component_mutable_dict['hostIds'] if host_id not in exclude_host_ids]
http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index afaf77d..d0f6801 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -43,6 +43,7 @@ class CommandStatusDict():
self.current_state = {} # Contains all statuses
self.lock = threading.RLock()
self.initializer_module = initializer_module
+ self.reported_reports = set()
def put_command_status(self, command, new_report):
@@ -52,6 +53,7 @@ class CommandStatusDict():
key = command['taskId']
with self.lock: # Synchronized
self.current_state[key] = (command, new_report)
+ self.reported_reports.discard(key)
self.force_update_to_server([new_report])
@@ -69,6 +71,7 @@ class CommandStatusDict():
FAILED. Statuses for COMPLETE or FAILED commands are forgotten after
generation
"""
+ self.generated_reports = []
from ActionQueue import ActionQueue
with self.lock: # Synchronized
resultReports = []
@@ -78,17 +81,20 @@ class CommandStatusDict():
if command ['commandType'] in [ActionQueue.EXECUTION_COMMAND, ActionQueue.BACKGROUND_EXECUTION_COMMAND]:
if (report['status']) != ActionQueue.IN_PROGRESS_STATUS:
resultReports.append(report)
- # Removing complete/failed command status from dict
- del self.current_state[key]
+ self.reported_reports.append(key)
else:
in_progress_report = self.generate_in_progress_report(command, report)
resultReports.append(in_progress_report)
elif command ['commandType'] in [ActionQueue.AUTO_EXECUTION_COMMAND]:
logger.debug("AUTO_EXECUTION_COMMAND task deleted " + str(command['commandId']))
- del self.current_state[key]
+ self.reported_reports.append(key)
pass
return resultReports
+ def clear_reported_reports(self):
+ with self.lock:
+ for key in self.reported_reports:
+ del self.current_state[key]
def generate_in_progress_report(self, command, report):
"""
http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
index 216f20b..6ee4474 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
@@ -42,12 +42,14 @@ class CommandStatusReporter(threading.Thread):
while not self.stop_event.is_set():
try:
- # TODO STOMP: if not registered, reports should not be on agent until next registration
- report = self.commandStatuses.generate_report()
- if report and self.initializer_module.is_registered:
- self.initializer_module.connection.send(message=report, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT)
- self.stop_event.wait(self.command_reports_interval)
+ if self.initializer_module.is_registered:
+ report = self.commandStatuses.generate_report()
+ if report:
+ self.initializer_module.connection.send(message=report, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT)
+ self.commandStatuses.clear_reported_reports()
except:
logger.exception("Exception in CommandStatusReporter. Re-running it")
- pass
+
+ self.stop_event.wait(self.command_reports_interval)
+
logger.info("CommandStatusReporter has successfully finished")
http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index 41f0df4..3a2e105 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -72,7 +72,7 @@ class ComponentStatusExecutor(threading.Thread):
component_name = component_dict.componentName
# TODO STOMP: run real command
- logger.info("Running {0}/{1}".format(component_dict.statusCommandParams.servicePackageFolder, component_dict.statusCommandParams.script))
+ logger.info("Running {0}/{1}".format(component_dict.statusCommandParams.service_package_folder, component_dict.statusCommandParams.script))
#self.customServiceOrchestrator.requestComponentStatus(command)
status = random.choice(["INSTALLED","STARTED"])
result = {
@@ -87,16 +87,14 @@ class ComponentStatusExecutor(threading.Thread):
logging.info("Status for {0} has changed to {1}".format(component_name, status))
cluster_reports[cluster_id].append(result)
- # TODO STOMP: what if not registered?
self.send_updates_to_server(cluster_reports)
- self.stop_event.wait(Constants.STATUS_COMMANDS_PACK_INTERVAL_SECONDS)
except:
logger.exception("Exception in ComponentStatusExecutor. Re-running it")
- pass
+
+ self.stop_event.wait(Constants.STATUS_COMMANDS_PACK_INTERVAL_SECONDS)
logger.info("ComponentStatusExecutor has successfully finished")
def send_updates_to_server(self, cluster_reports):
- # TODO STOMP: override send to send dicts and lists? and not use json.dump
if not cluster_reports or not self.initializer_module.is_registered:
return
http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/Constants.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py
index e15d1d8..8fafca1 100644
--- a/ambari-agent/src/main/python/ambari_agent/Constants.py
+++ b/ambari-agent/src/main/python/ambari_agent/Constants.py
@@ -22,13 +22,13 @@ limitations under the License.
COMMANDS_TOPIC = '/user/commands'
CONFIGURATIONS_TOPIC = '/user/configs'
METADATA_TOPIC = '/events/metadata'
-TOPOLOGIES_TOPIC = '/events/topology'
+TOPOLOGIES_TOPIC = '/events/topologies'
SERVER_RESPONSES_TOPIC = '/user/'
PRE_REGISTRATION_TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC]
POST_REGISTRATION_TOPICS_TO_SUBSCRIBE = [COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC]
-TOPOLOGY_REQUEST_ENDPOINT = '/agents/topology'
+TOPOLOGY_REQUEST_ENDPOINT = '/agents/topologies'
METADATA_REQUEST_ENDPOINT = '/agents/metadata'
CONFIGURATIONS_REQUEST_ENDPOINT = '/agents/configs'
COMPONENT_STATUS_REPORTS_ENDPOINT = '/reports/component_status'
http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
index 836ab07..fefe32a 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
@@ -26,7 +26,6 @@ import signal
import threading
import traceback
from ambari_commons.os_family_impl import OsFamilyImpl
-import sys
from ambari_agent.RemoteDebugUtils import bind_debug_signal_handlers
@@ -80,15 +79,14 @@ class HeartbeatStopHandlersWindows(HeartbeatStopHandlers):
# linux impl
def signal_handler(signum, frame):
- global _handler
logger.info("Ambari-agent received {0} signal, stopping...".format(signum))
- _handler.set_stop()
+ _handler.set()
def debug(sig, frame):
"""Interrupt running process, and provide a stacktrace of threads """
d = {'_frame': frame} # Allow access to frame object.
- d.update(frame.f_globals) # Unless shadowed by global
+ d.update(frame.f_globals) # Uamnless shadowed by global
d.update(frame.f_locals)
message = "Signal received.\nTraceback:\n"
@@ -123,7 +121,7 @@ class HeartbeatStopHandlersLinux(HeartbeatStopHandlers):
-def bind_signal_handlers(agentPid):
+def bind_signal_handlers(agentPid, stop_event):
global _handler
if OSCheck.get_os_family() != OSConst.WINSRV_FAMILY:
if os.getpid() == agentPid:
@@ -132,7 +130,7 @@ def bind_signal_handlers(agentPid):
bind_debug_signal_handlers()
- _handler = HeartbeatStopHandlersLinux()
+ _handler = stop_event
else:
- _handler = HeartbeatStopHandlersWindows()
+ _handler = stop_event
return _handler
http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index b7bb5ed..e54c0c1 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -23,6 +23,9 @@ import ambari_stomp
import threading
from ambari_agent import Constants
+from ambari_agent.Register import Register
+from ambari_agent.Utils import BlockingDictionary
+from ambari_agent.Utils import Utils
from ambari_agent.listeners.ServerResponsesListener import ServerResponsesListener
from ambari_agent.listeners.TopologyEventListener import TopologyEventListener
from ambari_agent.listeners.ConfigurationEventListener import ConfigurationEventListener
@@ -30,6 +33,7 @@ from ambari_agent.listeners.MetadataEventListener import MetadataEventListener
from ambari_agent.listeners.CommandsEventListener import CommandsEventListener
HEARTBEAT_INTERVAL = 10
+REQUEST_RESPONSE_TIMEOUT = 10
logger = logging.getLogger(__name__)
@@ -42,6 +46,8 @@ class HeartbeatThread(threading.Thread):
self.heartbeat_interval = HEARTBEAT_INTERVAL
self.stop_event = initializer_module.stop_event
+ self.registration_builder = Register(initializer_module.ambariConfig)
+
self.initializer_module = initializer_module
self.caches = [initializer_module.metadata_cache, initializer_module.topology_cache, initializer_module.configurations_cache]
@@ -52,18 +58,19 @@ class HeartbeatThread(threading.Thread):
self.topology_events_listener = TopologyEventListener(initializer_module.topology_cache)
self.configuration_events_listener = ConfigurationEventListener(initializer_module.configurations_cache)
self.listeners = [self.server_responses_listener, self.commands_events_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener]
+
self.post_registration_requests = [
(Constants.TOPOLOGY_REQUEST_ENDPOINT, initializer_module.topology_cache, self.topology_events_listener),
(Constants.METADATA_REQUEST_ENDPOINT, initializer_module.metadata_cache, self.metadata_events_listener),
(Constants.CONFIGURATIONS_REQUEST_ENDPOINT, initializer_module.configurations_cache, self.configuration_events_listener)
]
+ self.responseId = 0
def run(self):
"""
Run an endless loop of hearbeat with registration upon init or exception in heartbeating.
"""
- # TODO STOMP: stop the thread on SIGTERM
while not self.stop_event.is_set():
try:
if not self.initializer_module.is_registered:
@@ -73,17 +80,17 @@ class HeartbeatThread(threading.Thread):
logger.debug("Heartbeat body is {0}".format(heartbeat_body))
response = self.blocking_request(heartbeat_body, Constants.HEARTBEAT_ENDPOINT)
logger.debug("Heartbeat response is {0}".format(response))
-
- self.stop_event.wait(self.heartbeat_interval)
- # TODO STOMP: handle heartbeat reponse
+ self.handle_heartbeat_reponse(response)
except:
logger.exception("Exception in HeartbeatThread. Re-running the registration")
- self.stop_event.wait(self.heartbeat_interval)
self.initializer_module.is_registered = False
self.initializer_module.connection.disconnect()
- pass
+ delattr(self.initializer_module, '_connection')
+
+ self.stop_event.wait(self.heartbeat_interval)
self.initializer_module.connection.disconnect()
+ delattr(self.initializer_module, '_connection')
logger.info("HeartbeatThread has successfully finished")
def register(self):
@@ -93,7 +100,7 @@ class HeartbeatThread(threading.Thread):
self.add_listeners()
self.subscribe_to_topics(Constants.PRE_REGISTRATION_TOPICS_TO_SUBSCRIBE)
- registration_request = self.get_registration_request()
+ registration_request = self.registration_builder.build()
logger.info("Sending registration request")
logger.debug("Registration request is {0}".format(registration_request))
@@ -102,26 +109,53 @@ class HeartbeatThread(threading.Thread):
logger.info("Registration response received")
logger.debug("Registration response is {0}".format(response))
- self.registration_response = response
+ self.handle_registration_response(response)
for endpoint, cache, listener in self.post_registration_requests:
+ # should not hang forever on these requests
response = self.blocking_request({'hash': cache.hash}, endpoint)
- listener.on_event({}, response)
+ try:
+ listener.on_event({}, response)
+ except:
+ logger.exception("Exception while handing response to request at {0}. {1}".format(endpoint, response))
+ raise
self.subscribe_to_topics(Constants.POST_REGISTRATION_TOPICS_TO_SUBSCRIBE)
self.initializer_module.is_registered = True
- def get_registration_request(self):
- """
- Get registration request body to send it to server
- """
- return {'registration-response':'true'}
+ def handle_registration_response(self, response):
+ # exitstatus is a code of error which was raised on server side.
+ # exitstatus = 0 (OK - Default)
+ # exitstatus = 1 (Registration failed because different version of agent and server)
+ exitstatus = 0
+ if 'exitstatus' in response.keys():
+ exitstatus = int(response['exitstatus'])
+
+ if exitstatus != 0:
+ # log - message, which will be printed to agents log
+ if 'log' in response.keys():
+ error_message = "Registration failed due to: {0}".format(response['log'])
+ else:
+ error_message = "Registration failed"
+
+ raise Exception(error_message)
+
+ self.responseId = int(response['id'])
+
+ def handle_heartbeat_reponse(self, response):
+ serverId = int(response['id'])
+
+ if serverId != self.responseId + 1:
+ logger.error("Error in responseId sequence - restarting")
+ Utils.restartAgent()
+ else:
+ self.responseId = serverId
def get_heartbeat_body(self):
"""
Heartbeat body to be send to server
"""
- return {'hostname':'true'}
+ return {'id':self.responseId}
def add_listeners(self):
"""
@@ -134,9 +168,12 @@ class HeartbeatThread(threading.Thread):
for topic_name in topics_list:
self.initializer_module.connection.subscribe(destination=topic_name, id='sub', ack='client-individual')
- def blocking_request(self, message, destination):
+ def blocking_request(self, message, destination, timeout=REQUEST_RESPONSE_TIMEOUT):
"""
Send a request to server and waits for the response from it. The response it detected by the correspondence of correlation_id.
"""
correlation_id = self.initializer_module.connection.send(message=message, destination=destination)
- return self.server_responses_listener.responses.blocking_pop(str(correlation_id))
\ No newline at end of file
+ try:
+ return self.server_responses_listener.responses.blocking_pop(str(correlation_id), timeout=timeout)
+ except BlockingDictionary.DictionaryPopTimeout:
+ raise Exception("{0} seconds timeout expired waiting for response from server at {1} to message from {2}".format(timeout, Constants.SERVER_RESPONSES_TOPIC, destination))
http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/NetUtil.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/NetUtil.py b/ambari-agent/src/main/python/ambari_agent/NetUtil.py
index 9b29633..3f4895d 100644
--- a/ambari-agent/src/main/python/ambari_agent/NetUtil.py
+++ b/ambari-agent/src/main/python/ambari_agent/NetUtil.py
@@ -19,7 +19,6 @@ import logging
import httplib
import sys
from ssl import SSLError
-from HeartbeatHandlers import HeartbeatStopHandlers
from ambari_agent.AmbariConfig import AmbariConfig
from ambari_commons.inet_utils import ensure_ssl_using_protocol
@@ -44,16 +43,8 @@ class NetUtil:
# For testing purposes
DEBUG_STOP_RETRIES_FLAG = False
- # Stop implementation
- # Typically, it waits for a certain time for the daemon/service to receive the stop signal.
- # Received the number of seconds to wait as an argument
- # Returns true if the application is stopping, false if continuing execution
- stopCallback = None
-
- def __init__(self, config, stop_callback=None):
- if stop_callback is None:
- stop_callback = HeartbeatStopHandlers()
- self.stopCallback = stop_callback
+ def __init__(self, config, stop_event=None):
+ self.stop_event = stop_event
self.config = config
self.connect_retry_delay = int(config.get('server','connect_retry_delay',
default=self.DEFAULT_CONNECT_RETRY_DELAY_SEC))
@@ -71,13 +62,13 @@ class NetUtil:
try:
parsedurl = urlparse(url)
-
+
if sys.version_info >= (2,7,9) and not ssl_verify_cert:
import ssl
ca_connection = httplib.HTTPSConnection(parsedurl[1], context=ssl._create_unverified_context())
else:
ca_connection = httplib.HTTPSConnection(parsedurl[1])
-
+
ca_connection.request("GET", parsedurl[2])
response = ca_connection.getresponse()
status = response.status
@@ -121,11 +112,13 @@ class NetUtil:
self.connect_retry_delay))
retries += 1
- if 0 == self.stopCallback.wait(self.connect_retry_delay):
+ self.stop_event.wait(self.connect_retry_delay)
+
+ if self.stop_event.is_set():
#stop waiting
if logger is not None:
logger.info("Stop event received")
- self.DEBUG_STOP_RETRIES_FLAG = True
+ break
return retries, connected, self.DEBUG_STOP_RETRIES_FLAG
http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/Register.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Register.py b/ambari-agent/src/main/python/ambari_agent/Register.py
index 0c811c6..4c4bf32 100644
--- a/ambari-agent/src/main/python/ambari_agent/Register.py
+++ b/ambari-agent/src/main/python/ambari_agent/Register.py
@@ -18,15 +18,12 @@ See the License for the specific language governing permissions and
limitations under the License.
'''
-import os
import time
-import subprocess
-from Hardware import Hardware
-import hostname
-from HostInfo import HostInfo
+from ambari_agent import hostname
+from ambari_agent.Hardware import Hardware
+from ambari_agent.HostInfo import HostInfo
+from ambari_agent.Utils import Utils
-
-firstContact = True
class Register:
""" Registering with the server. Get the hardware profile and
declare success for now """
@@ -34,24 +31,23 @@ class Register:
self.config = config
self.hardware = Hardware(self.config)
- def build(self, version, id='-1'):
- global clusterId, clusterDefinitionRevision, firstContact
+ def build(self, response_id='-1'):
timestamp = int(time.time()*1000)
hostInfo = HostInfo(self.config)
agentEnv = { }
hostInfo.register(agentEnv, False, False)
- current_ping_port = self.config.get('agent','current_ping_port')
+ current_ping_port = self.config.get('agent','ping_port')
- register = { 'responseId' : int(id),
+ register = { 'id' : int(response_id),
'timestamp' : timestamp,
'hostname' : hostname.hostname(self.config),
'currentPingPort' : int(current_ping_port),
'publicHostname' : hostname.public_hostname(self.config),
'hardwareProfile' : self.hardware.get(),
'agentEnv' : agentEnv,
- 'agentVersion' : version,
+ 'agentVersion' : Utils.read_agent_version(self.config),
'prefix' : self.config.get('agent', 'prefix')
}
return register
http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/Utils.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Utils.py b/ambari-agent/src/main/python/ambari_agent/Utils.py
index f94eba0..845eb30 100644
--- a/ambari-agent/src/main/python/ambari_agent/Utils.py
+++ b/ambari-agent/src/main/python/ambari_agent/Utils.py
@@ -17,8 +17,12 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
+import os
import threading
from functools import wraps
+from ambari_agent.ExitHelper import ExitHelper
+
+AGENT_AUTO_RESTART_EXIT_CODE = 77
class BlockingDictionary():
"""
@@ -39,16 +43,20 @@ class BlockingDictionary():
self.dict[key] = value
self.put_event.set()
- def blocking_pop(self, key):
+ def blocking_pop(self, key, timeout=None):
"""
Block until a key in dictionary is available and than pop it.
+ If timeout exceeded None is returned.
"""
with self.dict_lock:
if key in self.dict:
return self.dict.pop(key)
while True:
- self.put_event.wait()
+ self.put_event.wait(timeout)
+ if not self.put_event.is_set():
+ raise BlockingDictionary.DictionaryPopTimeout()
+
self.put_event.clear()
with self.dict_lock:
if key in self.dict:
@@ -60,6 +68,9 @@ class BlockingDictionary():
def __str__(self):
return self.dict.__str__()
+ class DictionaryPopTimeout(Exception):
+ pass
+
class Utils(object):
@staticmethod
def make_immutable(value):
@@ -84,6 +95,19 @@ class Utils(object):
return param
+ @staticmethod
+ def read_agent_version(config):
+ data_dir = config.get('agent', 'prefix')
+ ver_file = os.path.join(data_dir, 'version')
+ with open(ver_file, "r") as f:
+ version = f.read().strip()
+ return version
+
+ @staticmethod
+ def restartAgent():
+ # TODO STOMP: set stop event?
+ ExitHelper().exit(AGENT_AUTO_RESTART_EXIT_CODE)
+
class ImmutableDictionary(dict):
def __init__(self, dictionary):
"""
http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
index b851443..c3839cb 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
@@ -40,12 +40,11 @@ class CommandsEventListener(EventListener):
@param headers: headers dictionary
@param message: message payload dictionary
"""
+ ""
commands = []
- for cluster_id in message.keys():
- cluster_dict = message[cluster_id]
- host_level_params = cluster_dict['hostLevelParams']
+ for cluster_id in message['clusters'].keys():
+ cluster_dict = message['clusters'][cluster_id]
for command in cluster_dict['commands']:
- command['hostLevelParams'] = host_level_params
commands.append(command)
self.action_queue.put(commands)
http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
index ddc5900..3dead4b 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
@@ -17,7 +17,7 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
-import json
+import ambari_simplejson as json
import ambari_stomp
import logging
@@ -38,14 +38,12 @@ class EventListener(ambari_stomp.ConnectionListener):
return
destination = headers['destination']
-
if destination.rstrip('/') == self.get_handled_path().rstrip('/'):
try:
message_json = json.loads(message)
except ValueError:
logger.exception("Received from server event is not a valid message json. Message is:\n{0}".format(message))
return
-
logger.info("Event from server at {0}{1}".format(destination, self.get_log_message(headers, message_json)))
try:
self.on_event(headers, message_json)
http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
index 72d6c70..a1a98e8 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -79,11 +79,8 @@ fix_encoding_reimport_bug()
import logging.handlers
import logging.config
-import signal
from optparse import OptionParser
import sys
-import traceback
-import getpass
import os
import time
import locale
@@ -92,7 +89,6 @@ import ConfigParser
import ProcessHelper
import resource
from logging.handlers import SysLogHandler
-from Controller import Controller
import AmbariConfig
from NetUtil import NetUtil
from PingPortListener import PingPortListener
@@ -102,9 +98,7 @@ from ambari_agent.ExitHelper import ExitHelper
import socket
from ambari_commons import OSConst, OSCheck
from ambari_commons.shell import shellRunner
-from ambari_commons.network import reconfigure_urllib2_opener
-from ambari_commons import shell
-import HeartbeatHandlers
+#from ambari_commons.network import reconfigure_urllib2_opener
from HeartbeatHandlers import bind_signal_handlers
from ambari_commons.constants import AMBARI_SUDO_BINARY
from resource_management.core.logger import Logger
@@ -349,13 +343,7 @@ def reset_agent(options):
MAX_RETRIES = 10
-# TODO STOMP: remove from globals
-initializer_module = None
-
-def run_threads():
- global initializer_module
- initializer_module = InitializerModule()
-
+def run_threads(initializer_module):
heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module)
heartbeat_thread.start()
@@ -370,13 +358,12 @@ def run_threads():
while not initializer_module.stop_event.is_set():
time.sleep(0.1)
- # TODO STOMP: if thread cannot stop by itself kill it hard after some timeout.
heartbeat_thread.join()
component_status_executor.join()
# event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process
# we need this for windows os, where no sigterm available
-def main(heartbeat_stop_callback=None):
+def main(initializer_module, heartbeat_stop_callback=None):
global config
global home_dir
@@ -449,7 +436,7 @@ def main(heartbeat_stop_callback=None):
if not config.use_system_proxy_setting():
logger.info('Agent is configured to ignore system proxy settings')
- reconfigure_urllib2_opener(ignore_system_proxy=True)
+ #reconfigure_urllib2_opener(ignore_system_proxy=True)
if not OSCheck.get_os_family() == OSConst.WINSRV_FAMILY:
daemonize()
@@ -475,7 +462,7 @@ def main(heartbeat_stop_callback=None):
logger.warn("Unable to determine the IP address of the Ambari server '%s'", server_hostname)
# Wait until MAX_RETRIES to see if server is reachable
- netutil = NetUtil(config, heartbeat_stop_callback)
+ netutil = NetUtil(config, initializer_module.stop_event)
(retries, connected, stopped) = netutil.try_to_connect(server_url, MAX_RETRIES, logger)
# if connected, launch controller
@@ -484,7 +471,7 @@ def main(heartbeat_stop_callback=None):
# Set the active server
active_server = server_hostname
# Launch Controller communication
- run_threads()
+ run_threads(initializer_module)
#
# If Ambari Agent connected to the server or
@@ -503,9 +490,10 @@ def main(heartbeat_stop_callback=None):
if __name__ == "__main__":
is_logger_setup = False
try:
- heartbeat_stop_callback = bind_signal_handlers(agentPid)
+ initializer_module = InitializerModule()
+ heartbeat_stop_callback = bind_signal_handlers(agentPid, initializer_module.stop_event)
- main(heartbeat_stop_callback)
+ main(initializer_module, heartbeat_stop_callback)
except SystemExit:
raise
except BaseException:
http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
index 87417fc..5db53c8 100644
--- a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
+++ b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
@@ -54,7 +54,6 @@ class BaseStompServerTestCase(unittest.TestCase):
"""
def setUp(self):
-
self.clients = []
self.server = None # This gets set in the server thread.
self.server_address = None # This gets set in the server thread.
@@ -100,7 +99,7 @@ class BaseStompServerTestCase(unittest.TestCase):
def tearDown(self):
for c in self.clients:
c.close()
- self.server.shutdown() # server_close takes too much time
+ self.server.server_close()
self.server_thread.join()
self.ready_event.clear()
del self.server_thread
@@ -157,6 +156,17 @@ class BaseStompServerTestCase(unittest.TestCase):
if os.path.isfile(filepath):
os.remove(filepath)
+ def assert_with_retries(self, func, tries, try_sleep):
+ # wait for 2 seconds
+ for i in range(tries):
+ try:
+ func()
+ break
+ except AssertionError:
+ time.sleep(try_sleep)
+ else:
+ func()
+
class TestStompServer(ThreadedStompServer):
"""
http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
index 87c7f57..c969c75 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -36,16 +36,21 @@ from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
from mock.mock import MagicMock, patch
class TestAgentStompResponses(BaseStompServerTestCase):
- """
- @patch.object(CustomServiceOrchestrator, "runCommand")
- def test_mock_server_can_start(self, runCommand_mock):
- runCommand_mock.return_value = {'stdout':'...', 'stderr':'...', 'structuredOut' : '{}', 'exitcode':1}
-
+ def setUp(self):
self.remove_files(['/tmp/cluster_cache/configurations.json', '/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json'])
if not os.path.exists("/tmp/ambari-agent"):
os.mkdir("/tmp/ambari-agent")
+ with open("/tmp/ambari-agent/version", "w") as fp:
+ fp.write("2.5.0.0")
+
+ return super(TestAgentStompResponses, self).setUp()
+
+ @patch.object(CustomServiceOrchestrator, "runCommand")
+ def test_mock_server_can_start(self, runCommand_mock):
+ runCommand_mock.return_value = {'stdout':'...', 'stderr':'...', 'structuredOut' : '{}', 'exitcode':1}
+
initializer_module = InitializerModule()
heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module)
heartbeat_thread.start()
@@ -101,7 +106,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
action_status_failed_frame = json.loads(self.server.frames_queue.get().body)
initializer_module.stop_event.set()
- f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'heartbeat-response':'true'}))
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'0'}))
self.server.topic_manager.send(f)
heartbeat_thread.join()
@@ -118,8 +123,8 @@ class TestAgentStompResponses(BaseStompServerTestCase):
self.assertEquals(dn_start_failed_frame[0]['status'], 'FAILED')
- ============================================================================================
- ============================================================================================
+ #============================================================================================
+ #============================================================================================
initializer_module = InitializerModule()
@@ -163,24 +168,15 @@ class TestAgentStompResponses(BaseStompServerTestCase):
initializer_module.stop_event.set()
- f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'heartbeat-response':'true'}))
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'0'}))
self.server.topic_manager.send(f)
heartbeat_thread.join()
component_status_executor.join()
command_status_reporter.join()
action_queue.join()
- """
-
- @patch.object(CustomServiceOrchestrator, "runCommand")
- def test_topology_update_and_delete(self, runCommand_mock):
- runCommand_mock.return_value = {'stdout':'...', 'stderr':'...', 'structuredOut' : '{}', 'exitcode':1}
-
- self.remove_files(['/tmp/cluster_cache/configurations.json', '/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json'])
-
- if not os.path.exists("/tmp/ambari-agent"):
- os.mkdir("/tmp/ambari-agent")
+ def test_topology_update_and_delete(self):
initializer_module = InitializerModule()
heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module)
heartbeat_thread.start()
@@ -211,34 +207,40 @@ class TestAgentStompResponses(BaseStompServerTestCase):
while not initializer_module.is_registered:
time.sleep(0.1)
- f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, body=self.get_json("topology_add_component.json"))
+ f = Frame(frames.MESSAGE, headers={'destination': '/events/topologies'}, body=self.get_json("topology_add_component.json"))
self.server.topic_manager.send(f)
- f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, body=self.get_json("topology_add_component_host.json"))
+ f = Frame(frames.MESSAGE, headers={'destination': '/events/topologies'}, body=self.get_json("topology_add_component_host.json"))
self.server.topic_manager.send(f)
- f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, body=self.get_json("topology_add_host.json"))
+ f = Frame(frames.MESSAGE, headers={'destination': '/events/topologies'}, body=self.get_json("topology_add_host.json"))
self.server.topic_manager.send(f)
- f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, body=self.get_json("topology_delete_host.json"))
+ f = Frame(frames.MESSAGE, headers={'destination': '/events/topologies'}, body=self.get_json("topology_delete_host.json"))
self.server.topic_manager.send(f)
- f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, body=self.get_json("topology_delete_component.json"))
+ f = Frame(frames.MESSAGE, headers={'destination': '/events/topologies'}, body=self.get_json("topology_delete_component.json"))
self.server.topic_manager.send(f)
- f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, body=self.get_json("topology_delete_component_host.json"))
+ f = Frame(frames.MESSAGE, headers={'destination': '/events/topologies'}, body=self.get_json("topology_delete_component_host.json"))
self.server.topic_manager.send(f)
- f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, body=self.get_json("topology_delete_cluster.json"))
+ f = Frame(frames.MESSAGE, headers={'destination': '/events/topologies'}, body=self.get_json("topology_delete_cluster.json"))
self.server.topic_manager.send(f)
- time.sleep(0.1)
- self.assertEquals(json.dumps(initializer_module.topology_cache, indent=2, sort_keys=True), json.dumps(self.get_dict_from_file("topology_cache_expected.json"), indent=2, sort_keys=True))
- #self.assertEquals(initializer_module.topology_cache, self.get_dict_from_file("topology_cache_expected.json"))
+ def is_json_equal():
+ json_topology = json.dumps(initializer_module.topology_cache, indent=2, sort_keys=True)
+ json_excepted_lopology = json.dumps(self.get_dict_from_file("topology_cache_expected.json"), indent=2, sort_keys=True)
+ #print json_topology
+ #print json_excepted_lopology
+ self.assertEquals(json_topology, json_excepted_lopology)
+ #self.assertEquals(initializer_module.topology_cache, self.get_dict_from_file("topology_cache_expected.json"))
+
+ self.assert_with_retries(is_json_equal, tries=40, try_sleep=0.1)
initializer_module.stop_event.set()
- f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'heartbeat-response':'true'}))
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'0'}))
self.server.topic_manager.send(f)
heartbeat_thread.join()
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
index 525af5c..536233d 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
@@ -1,115 +1,89 @@
{
- "0":{
- "hostLevelParams":{
- "agent_stack_retry_on_unavailability":"false",
- "unlimited_key_jce_required":"false",
- "group_list":"[\"hadoop\",\"users\"]",
- "host_sys_prepped":"false",
- "ambari_db_rca_username":"mapred",
- "jdk_name":"jdk-8u112-linux-x64.tar.gz",
- "mysql_jdbc_url":"http://gc6401:8080/resources//mysql-connector-java.jar",
- "agent_stack_retry_count":"5",
- "user_groups":"{}",
- "stack_version":"2.5",
- "stack_name":"HDP",
- "ambari_db_rca_driver":"org.postgresql.Driver",
- "java_home":"/usr/jdk64/jdk1.8.0_112",
- "jdk_location":"http://gc6401:8080/resources/",
- "not_managed_hdfs_path_list":"[\"/tmp\"]",
- "ambari_db_rca_url":"jdbc:postgresql://gc6401/ambarirca",
- "java_version":"8",
- "repo_info":"...",
- "db_name":"ambari",
- "agentCacheDir":"/var/lib/ambari-agent/cache",
- "ambari_db_rca_password":"mapred",
- "jce_name":"jce_policy-8.zip",
- "oracle_jdbc_url":"http://gc6401:8080/resources//ojdbc6.jar",
- "db_driver_filename":"mysql-connector-java.jar",
- "user_list":"[\"zookeeper\",\"ambari-qa\",\"hdfs\"]",
- "clientsToUpdateConfigs":"[\"*\"]"
- },
- "commands":[
- {
- "requestId":5,
- "taskId":9,
- "commandId":1,
- "serviceName":"HDFS",
- "role":"DATANODE",
- "commandType":"EXECUTION_COMMAND",
- "roleCommand":"START",
- "clusterName": "c1",
- "clusterId": 0,
- "configuration_credentials":{
+ "clusters": {
+ "0":{
+ "commands":[
+ {
+ "requestId":5,
+ "taskId":9,
+ "commandId":1,
+ "serviceName":"HDFS",
+ "role":"DATANODE",
+ "commandType":"EXECUTION_COMMAND",
+ "roleCommand":"START",
+ "clusterName": "c1",
+ "clusterId": 0,
+ "configuration_credentials":{
+ },
+ "commandParams":{
+ "service_package_folder":"common-services/HDFS/2.1.0.2.0/package",
+ "hooks_folder":"HDP/2.0.6/hooks",
+ "script":"scripts/datanode.py",
+ "phase":"INITIAL_START",
+ "max_duration_for_retries":"600",
+ "command_retry_enabled":"false",
+ "command_timeout":"1200",
+ "refresh_topology":"True",
+ "script_type":"PYTHON"
+ }
},
- "commandParams":{
- "service_package_folder":"common-services/HDFS/2.1.0.2.0/package",
- "hooks_folder":"HDP/2.0.6/hooks",
- "script":"scripts/datanode.py",
- "phase":"INITIAL_START",
- "max_duration_for_retries":"600",
- "command_retry_enabled":"false",
- "command_timeout":"1200",
- "refresh_topology":"True",
- "script_type":"PYTHON"
- }
- },
- {
- "requestId":6,
- "taskId":9,
- "commandId":0,
- "clusterId": "null",
- "serviceName":"ZOOKEEPER",
- "role":"ZOOKEEPER_SERVER",
- "commandType":"EXECUTION_COMMAND",
- "roleCommand":"START",
- "clusterName": "c1",
- "configuration_credentials":{
+ {
+ "requestId":6,
+ "taskId":9,
+ "commandId":0,
+ "clusterId": "null",
+ "serviceName":"ZOOKEEPER",
+ "role":"ZOOKEEPER_SERVER",
+ "commandType":"EXECUTION_COMMAND",
+ "roleCommand":"START",
+ "clusterName": "c1",
+ "configuration_credentials":{
- },
- "commandParams":{
- "service_package_folder":"common-services/ZOOKEEPER/3.4.5/package",
- "hooks_folder":"HDP/2.0.6/hooks",
- "script":"scripts/datanode.py",
- "phase":"INITIAL_START",
- "max_duration_for_retries":"600",
- "command_retry_enabled":"false",
- "command_timeout":"1200",
- "refresh_topology":"True",
- "script_type":"PYTHON"
+ },
+ "commandParams":{
+ "service_package_folder":"common-services/ZOOKEEPER/3.4.5/package",
+ "hooks_folder":"HDP/2.0.6/hooks",
+ "script":"scripts/datanode.py",
+ "phase":"INITIAL_START",
+ "max_duration_for_retries":"600",
+ "command_retry_enabled":"false",
+ "command_timeout":"1200",
+ "refresh_topology":"True",
+ "script_type":"PYTHON"
+ }
}
- }
- ]
- },
- "-1":{
- "hostLevelParams":{
- "agent_stack_retry_count":"5",
- "agent_stack_retry_on_unavailability":"false",
- "agentCacheDir":"/var/lib/ambari-agent/cache"
+ ]
},
- "commands":[
- {
- "roleCommand":"ACTIONEXECUTE",
- "serviceName":"null",
- "role":"check_host",
- "commandType":"EXECUTION_COMMAND",
- "taskId":2,
- "commandId":1,
- "clusterId": "null",
- "commandParams":{
- "script":"check_host.py",
- "check_execute_list":"host_resolution_check",
- "threshold":"20",
- "hosts":"gc6401",
- "command_timeout":"900",
- "script_type":"PYTHON"
- },
- "roleParams":{
- "threshold":"20",
- "check_execute_list":"host_resolution_check",
- "hosts":"gc6401"
+ "-1":{
+ "commands":[
+ {
+ "roleCommand":"ACTIONEXECUTE",
+ "serviceName":"null",
+ "role":"check_host",
+ "commandType":"EXECUTION_COMMAND",
+ "taskId":2,
+ "commandId":1,
+ "clusterId": "null",
+ "commandParams":{
+ "script":"check_host.py",
+ "check_execute_list":"host_resolution_check",
+ "threshold":"20",
+ "hosts":"gc6401",
+ "command_timeout":"900",
+ "script_type":"PYTHON"
+ },
+ "roleParams":{
+ "threshold":"20",
+ "check_execute_list":"host_resolution_check",
+ "hosts":"gc6401"
+ },
+ "hostLevelParams":{
+ "agent_stack_retry_count":"5",
+ "agent_stack_retry_on_unavailability":"false",
+ "agentCacheDir":"/var/lib/ambari-agent/cache"
+ }
}
- }
- ]
+ ]
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/registration_response.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/registration_response.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/registration_response.json
index 6873d7b..5564794 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/registration_response.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/registration_response.json
@@ -1,5 +1,5 @@
{
- "responseId":0,
+ "id":0,
"exitstatus":0,
"errorMessage":"",
"clustersList":[