You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/07/04 08:29:36 UTC
ambari git commit: AMBARI-21394. Create a topic to send
hostLevelParams (aonishuk)
Repository: ambari
Updated Branches:
refs/heads/branch-3.0-perf 51b1a14d7 -> 917898cdb
AMBARI-21394. Create a topic to send hostLevelParams (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/917898cd
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/917898cd
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/917898cd
Branch: refs/heads/branch-3.0-perf
Commit: 917898cdbed3bd37e1bbfa813b115f93dfb6dcaf
Parents: 51b1a14
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Tue Jul 4 11:29:31 2017 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Tue Jul 4 11:29:31 2017 +0300
----------------------------------------------------------------------
.../ambari_agent/ClusterHostLevelParamsCache.py | 45 +++++++++++++++
.../src/main/python/ambari_agent/Constants.py | 4 +-
.../ambari_agent/CustomServiceOrchestrator.py | 4 +-
.../main/python/ambari_agent/HeartbeatThread.py | 14 +++--
.../python/ambari_agent/InitializerModule.py | 2 +
.../main/python/ambari_agent/RecoveryManager.py | 16 ++---
.../listeners/HostLevelParamsEventListener.py | 61 ++++++++++++++++++++
.../listeners/MetadataEventListener.py | 11 +---
.../ambari_agent/TestAgentStompResponses.py | 20 +++++--
.../dummy_files/stomp/host_level_params.json | 37 ++++++++++++
.../stomp/metadata_after_registration.json | 9 +--
.../dummy_files/stomp/topology_add_host.json | 5 +-
.../stomp/topology_cache_expected.json | 6 --
.../dummy_files/stomp/topology_create.json | 15 +----
14 files changed, 187 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/ClusterHostLevelParamsCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterHostLevelParamsCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterHostLevelParamsCache.py
new file mode 100644
index 0000000..3e490c5
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterHostLevelParamsCache.py
@@ -0,0 +1,45 @@
+#!/usr/bin/env python
+
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from ambari_agent.ClusterCache import ClusterCache
+import logging
+
+logger = logging.getLogger(__name__)
+
+class ClusterHostLevelParamsCache(ClusterCache):
+ """
+ Maintains an in-memory cache and disk cache of the host level params send from server for
+ every cluster. This is useful for having quick access to any of the
+ topology properties.
+
+ Host level params. Is parameters used by execution and status commands which can be generated
+ differently for every host.
+ """
+
+ def __init__(self, cluster_cache_dir):
+ """
+ Initializes the host level params cache.
+ :param cluster_cache_dir:
+ :return:
+ """
+ super(ClusterHostLevelParamsCache, self).__init__(cluster_cache_dir)
+
+ def get_cache_name(self):
+ return 'host_level_params'
http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/Constants.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py
index 02945ee..17ed2be 100644
--- a/ambari-agent/src/main/python/ambari_agent/Constants.py
+++ b/ambari-agent/src/main/python/ambari_agent/Constants.py
@@ -21,16 +21,18 @@ limitations under the License.
COMMANDS_TOPIC = '/user/commands'
CONFIGURATIONS_TOPIC = '/user/configs'
+HOST_LEVEL_PARAMS_TOPIC = '/user/host_level_params'
METADATA_TOPIC = '/events/metadata'
TOPOLOGIES_TOPIC = '/events/topologies'
SERVER_RESPONSES_TOPIC = '/user/'
PRE_REGISTRATION_TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC]
-POST_REGISTRATION_TOPICS_TO_SUBSCRIBE = [COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC]
+POST_REGISTRATION_TOPICS_TO_SUBSCRIBE = [COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC, HOST_LEVEL_PARAMS_TOPIC]
TOPOLOGY_REQUEST_ENDPOINT = '/agents/topologies'
METADATA_REQUEST_ENDPOINT = '/agents/metadata'
CONFIGURATIONS_REQUEST_ENDPOINT = '/agents/configs'
+HOST_LEVEL_PARAMS_TOPIC_ENPOINT = '/agents/host_level_params'
COMPONENT_STATUS_REPORTS_ENDPOINT = '/reports/component_status'
COMMANDS_STATUS_REPORTS_ENDPOINT = '/reports/commands_status'
HOST_STATUS_REPORTS_ENDPOINT = '/reports/host_status'
http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 6d1a491..c0b20ed 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -81,6 +81,7 @@ class CustomServiceOrchestrator():
self.metadata_cache = initializer_module.metadata_cache
self.topology_cache = initializer_module.topology_cache
self.configurations_cache = initializer_module.configurations_cache
+ self.host_level_params_cache = initializer_module.host_level_params_cache
self.config = initializer_module.config
self.tmp_dir = self.config.get('agent', 'prefix')
self.force_https_protocol = self.config.get_force_https_protocol()
@@ -459,13 +460,14 @@ class CustomServiceOrchestrator():
metadata_cache = self.metadata_cache[cluster_id]
configurations_cache = self.configurations_cache[cluster_id]
+ host_level_params_cache = self.host_level_params_cache[cluster_id]
component_dict = self.topology_cache.get_component_info_by_key(cluster_id, service_name, component_name)
command_dict = {
'clusterLevelParams': metadata_cache.clusterLevelParams,
'serviceLevelParams': metadata_cache.serviceLevelParams[service_name],
- 'hostLevelParams': self.topology_cache.get_current_host_info(cluster_id).hostLevelParams,
+ 'hostLevelParams': host_level_params_cache,
'componentLevelParams': component_dict.componentLevelParams,
'script_type': self.SCRIPT_TYPE_PYTHON
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index dbf4006..6ba84e6 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -31,6 +31,7 @@ from ambari_agent.listeners.TopologyEventListener import TopologyEventListener
from ambari_agent.listeners.ConfigurationEventListener import ConfigurationEventListener
from ambari_agent.listeners.MetadataEventListener import MetadataEventListener
from ambari_agent.listeners.CommandsEventListener import CommandsEventListener
+from ambari_agent.listeners.HostLevelParamsEventListener import HostLevelParamsEventListener
HEARTBEAT_INTERVAL = 10
REQUEST_RESPONSE_TIMEOUT = 10
@@ -49,20 +50,21 @@ class HeartbeatThread(threading.Thread):
self.registration_builder = Register(initializer_module.config)
self.initializer_module = initializer_module
- self.caches = [initializer_module.metadata_cache, initializer_module.topology_cache, initializer_module.configurations_cache]
# listeners
self.server_responses_listener = ServerResponsesListener()
self.commands_events_listener = CommandsEventListener(initializer_module.action_queue)
- self.metadata_events_listener = MetadataEventListener(initializer_module.metadata_cache, initializer_module.recovery_manager)
+ self.metadata_events_listener = MetadataEventListener(initializer_module.metadata_cache)
self.topology_events_listener = TopologyEventListener(initializer_module.topology_cache)
self.configuration_events_listener = ConfigurationEventListener(initializer_module.configurations_cache)
- self.listeners = [self.server_responses_listener, self.commands_events_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener]
+ self.host_level_params_events_listener = HostLevelParamsEventListener(initializer_module.host_level_params_cache, initializer_module.recovery_manager)
+ self.listeners = [self.server_responses_listener, self.commands_events_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener, self.host_level_params_events_listener]
self.post_registration_requests = [
(Constants.TOPOLOGY_REQUEST_ENDPOINT, initializer_module.topology_cache, self.topology_events_listener),
(Constants.METADATA_REQUEST_ENDPOINT, initializer_module.metadata_cache, self.metadata_events_listener),
- (Constants.CONFIGURATIONS_REQUEST_ENDPOINT, initializer_module.configurations_cache, self.configuration_events_listener)
+ (Constants.CONFIGURATIONS_REQUEST_ENDPOINT, initializer_module.configurations_cache, self.configuration_events_listener),
+ (Constants.HOST_LEVEL_PARAMS_TOPIC_ENPOINT, initializer_module.host_level_params_cache, self.host_level_params_events_listener)
]
self.responseId = 0
@@ -152,6 +154,10 @@ class HeartbeatThread(threading.Thread):
else:
self.responseId = serverId
+ if 'restartAgent' in response and response['restartAgent'].lower() == "true":
+ logger.warn("Restarting the agent by the request from server")
+ Utils.restartAgent()
+
def get_heartbeat_body(self):
"""
Heartbeat body to be send to server
http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index 8de1fa5..dbc26e8 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -26,6 +26,7 @@ from ambari_agent.AmbariConfig import AmbariConfig
from ambari_agent.ClusterConfigurationCache import ClusterConfigurationCache
from ambari_agent.ClusterTopologyCache import ClusterTopologyCache
from ambari_agent.ClusterMetadataCache import ClusterMetadataCache
+from ambari_agent.ClusterHostLevelParamsCache import ClusterHostLevelParamsCache
from ambari_agent.Utils import lazy_property
from ambari_agent.security import AmbariStompConnection
from ambari_agent.ActionQueue import ActionQueue
@@ -73,6 +74,7 @@ class InitializerModule:
self.metadata_cache = ClusterMetadataCache(self.cluster_cache_dir)
self.topology_cache = ClusterTopologyCache(self.cluster_cache_dir, self.config)
self.configurations_cache = ClusterConfigurationCache(self.cluster_cache_dir)
+ self.host_level_params_cache = ClusterHostLevelParamsCache(self.cluster_cache_dir)
self.customServiceOrchestrator = CustomServiceOrchestrator(self)
self.recovery_manager = RecoveryManager(self.recovery_cache_dir)
http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
index 68dd0be..19f163e 100644
--- a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
+++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
@@ -97,7 +97,6 @@ class RecoveryManager:
self.__cache_lock = threading.RLock()
self.active_command_count = 0
self.paused = False
- self.recovery_timestamp = -1
self.cluster_id = None
if not os.path.exists(cache_dir):
@@ -110,7 +109,7 @@ class RecoveryManager:
self.actions = {}
- self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, auto_install_start, "", -1)
+ self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, auto_install_start, "")
pass
@@ -560,8 +559,7 @@ class RecoveryManager:
"maxCount" : 10,
"windowInMinutes" : 60,
"retryGap" : 0,
- "components" : "a,b",
- "recoveryTimestamp" : 1458150424380
+ "components" : "a,b"
}
"""
@@ -573,7 +571,6 @@ class RecoveryManager:
retry_gap = 5
max_lifetime_count = 12
enabled_components = ""
- recovery_timestamp = -1 # Default value if recoveryTimestamp is not available.
if dictionary and "recoveryConfig" in dictionary:
@@ -599,11 +596,8 @@ class RecoveryManager:
if 'components' in config:
enabled_components = config['components']
- if 'recoveryTimestamp' in config:
- recovery_timestamp = config['recoveryTimestamp']
-
self.update_config(max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only,
- auto_install_start, enabled_components, recovery_timestamp)
+ auto_install_start, enabled_components)
pass
"""
@@ -617,10 +611,9 @@ class RecoveryManager:
auto_start_only - True if AUTO_START recovery type was specified. False otherwise.
auto_install_start - True if AUTO_INSTALL_START recovery type was specified. False otherwise.
enabled_components - CSV of componenents enabled for auto start.
- recovery_timestamp - Timestamp when the recovery values were last updated. -1 on start up.
"""
def update_config(self, max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled,
- auto_start_only, auto_install_start, enabled_components, recovery_timestamp):
+ auto_start_only, auto_install_start, enabled_components):
"""
Update recovery configuration, recovery is disabled if configuration values
are not correct
@@ -653,7 +646,6 @@ class RecoveryManager:
self.auto_install_start = auto_install_start
self.max_lifetime_count = max_lifetime_count
self.enabled_components = []
- self.recovery_timestamp = recovery_timestamp
self.allowed_desired_states = [self.STARTED, self.INSTALLED]
self.allowed_current_states = [self.INIT, self.INSTALL_FAILED, self.INSTALLED, self.STARTED]
http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py
new file mode 100644
index 0000000..5aee634
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py
@@ -0,0 +1,61 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import ambari_stomp
+
+from ambari_agent.listeners import EventListener
+from ambari_agent import Constants
+
+logger = logging.getLogger(__name__)
+
+class HostLevelParamsEventListener(EventListener):
+ """
+ Listener of Constants.HOST_LEVEL_PARAMS_TOPIC events from server.
+ """
+ def __init__(self, host_level_params_cache, recovery_manager):
+ self.host_level_params_cache = host_level_params_cache
+ self.recovery_manager = recovery_manager
+
+ def on_event(self, headers, message):
+ """
+ Is triggered when an event to Constants.CONFIGURATIONS_TOPIC topic is received from server.
+
+ @param headers: headers dictionary
+ @param message: message payload dictionary
+ """
+ # this kind of response is received if hash was identical. And server does not need to change anything
+ if message == {}:
+ return
+
+ self.host_level_params_cache.rewrite_cache(message['clusters'])
+ self.host_level_params_cache.hash = message['hash']
+
+ if message['clusters']:
+ # FIXME: Recovery manager does not support multiple cluster as of now.
+ cluster_id = message['clusters'].keys()[0]
+
+ if 'recoveryConfig' in message['clusters'][cluster_id]:
+ logging.info("Updating recoveryConfig from metadata")
+ self.recovery_manager.update_recovery_config(self.host_level_params_cache[cluster_id])
+ self.recovery_manager.cluster_id = cluster_id
+
+ def get_handled_path(self):
+ return Constants.HOST_LEVEL_PARAMS_TOPIC
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
index 364d8af..5802ffe 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
@@ -32,9 +32,8 @@ class MetadataEventListener(EventListener):
"""
Listener of Constants.METADATA_TOPIC events from server.
"""
- def __init__(self, metadata_cache, recovery_manager):
+ def __init__(self, metadata_cache):
self.metadata_cache = metadata_cache
- self.recovery_manager = recovery_manager
def on_event(self, headers, message):
"""
@@ -50,13 +49,5 @@ class MetadataEventListener(EventListener):
self.metadata_cache.rewrite_cache(message['clusters'])
self.metadata_cache.hash = message['hash']
- # FIXME: Recovery manager does not support multiple cluster as of now.
- cluster_id = message['clusters'].keys()[0]
-
- if 'recoveryConfig' in message['clusters'][cluster_id]:
- logging.info("Updating recoveryConfig from metadata")
- self.recovery_manager.update_recovery_config(self.metadata_cache[cluster_id])
- self.recovery_manager.cluster_id = cluster_id
-
def get_handled_path(self):
return Constants.METADATA_TOPIC
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
index c41f87e..feaf7dd 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -40,7 +40,7 @@ from mock.mock import MagicMock, patch
@patch("ambari_agent.hostname.hostname", new=MagicMock(return_value="c6401.ambari.apache.org"))
class TestAgentStompResponses(BaseStompServerTestCase):
def setUp(self):
- self.remove_files(['/tmp/cluster_cache/configurations.json', '/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json'])
+ self.remove_files(['/tmp/cluster_cache/configurations.json', '/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json', '/tmp/host_level_params.json'])
if not os.path.exists("/tmp/ambari-agent"):
os.mkdir("/tmp/ambari-agent")
@@ -83,9 +83,13 @@ class TestAgentStompResponses(BaseStompServerTestCase):
f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body=self.get_json("configurations_update.json"))
self.server.topic_manager.send(f)
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=self.get_json("host_level_params.json"))
+ self.server.topic_manager.send(f)
+
initial_topology_request = self.server.frames_queue.get()
initial_metadata_request = self.server.frames_queue.get()
initial_configs_request = self.server.frames_queue.get()
+ initial_host_level_params_request = self.server.frames_queue.get()
while not initializer_module.is_registered:
time.sleep(0.1)
@@ -103,6 +107,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
configurations_subscribe_frame = self.server.frames_queue.get()
metadata_subscribe_frame = self.server.frames_queue.get()
topologies_subscribe_frame = self.server.frames_queue.get()
+ host_level_params_subscribe_frame = self.server.frames_queue.get()
heartbeat_frame = self.server.frames_queue.get()
dn_install_in_progress_frame = json.loads(self.server.frames_queue.get().body)
dn_install_failed_frame = json.loads(self.server.frames_queue.get().body)
@@ -116,7 +121,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
initializer_module.stop_event.set()
- f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'1'}))
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '5'}, body=json.dumps({'id':'1'}))
self.server.topic_manager.send(f)
command_status_reporter.join()
@@ -179,6 +184,9 @@ class TestAgentStompResponses(BaseStompServerTestCase):
f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body='{}')
self.server.topic_manager.send(f)
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body='{}')
+ self.server.topic_manager.send(f)
+
commands_subscribe_frame = self.server.frames_queue.get()
configurations_subscribe_frame = self.server.frames_queue.get()
metadata_subscribe_frame = self.server.frames_queue.get()
@@ -188,7 +196,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
initializer_module.stop_event.set()
- f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'1'}))
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '5'}, body=json.dumps({'id':'1'}))
self.server.topic_manager.send(f)
heartbeat_thread.join()
@@ -222,9 +230,13 @@ class TestAgentStompResponses(BaseStompServerTestCase):
f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body='{}')
self.server.topic_manager.send(f)
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body='{}')
+ self.server.topic_manager.send(f)
+
initial_topology_request = self.server.frames_queue.get()
initial_metadata_request = self.server.frames_queue.get()
initial_configs_request = self.server.frames_queue.get()
+ initial_host_level_params_request = self.server.frames_queue.get()
while not initializer_module.is_registered:
time.sleep(0.1)
@@ -262,7 +274,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
initializer_module.stop_event.set()
- f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'1'}))
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '5'}, body=json.dumps({'id':'1'}))
self.server.topic_manager.send(f)
heartbeat_thread.join()
http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/host_level_params.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/host_level_params.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/host_level_params.json
new file mode 100644
index 0000000..7447e93
--- /dev/null
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/host_level_params.json
@@ -0,0 +1,37 @@
+{
+ "hash":"aa4cd03688d36f18fc2cbba06614bafe",
+ "clusters":{
+ "0":{
+ "repoInfo": [
+ {
+ "defaultBaseUrl": "http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3",
+ "ambariManagedRepositories": true,
+ "baseUrl": "http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/",
+ "latestBaseUrl": "http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129",
+ "repoSaved": true,
+ "repoName": "HDP",
+ "osType": "redhat6",
+ "unique": true,
+ "repoId": "HDP-2.6"
+ },
+ {
+ "defaultBaseUrl": "http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6",
+ "ambariManagedRepositories": true,
+ "baseUrl": "http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6",
+ "latestBaseUrl": "http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6",
+ "repoSaved": true,
+ "repoName": "HDP-UTILS",
+ "osType": "redhat6",
+ "unique": false,
+ "repoId": "HDP-UTILS-1.1.0.21"
+ }
+ ],
+ "recoveryConfig": {
+ "type" : "AUTO_INSTALL_START",
+ "maxCount" : 10,
+ "windowInMinutes" : 60,
+ "components" : "NAMENODE,DATANODE"
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
index 6462ccf..c1aba28 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
@@ -38,14 +38,7 @@
},
"status_commands_to_run": [
"STATUS"
- ],
- "recoveryConfig": {
- "type" : "AUTO_INSTALL_START",
- "maxCount" : 10,
- "windowInMinutes" : 60,
- "components" : "NAMENODE,DATANODE",
- "recoveryTimestamp" : 1458150424380
- }
+ ]
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json
index 2458f08..a9407c3 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json
@@ -8,10 +8,7 @@
"hostId":2,
"hostName":"c6403.ambari.apache.org",
"rackName":"/default-rack",
- "ipv4":"192.168.64.103",
- "hostLevelParams": {
- "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]"
- }
+ "ipv4":"192.168.64.103"
}
]
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json
index 53d0e0d..ff2b3fd 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json
@@ -46,18 +46,12 @@
"hosts": [
{
"hostId": 1,
- "hostLevelParams": {
- "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]"
- },
"hostName": "c6402.ambari.apache.org",
"ipv4": "192.168.64.102",
"rackName": "/default-rack"
},
{
"hostId": 2,
- "hostLevelParams": {
- "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]"
- },
"hostName": "c6403.ambari.apache.org",
"ipv4": "192.168.64.103",
"rackName": "/default-rack"
http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json
index dfe17b9..6df4bc3 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json
@@ -50,19 +50,13 @@
"hostId":0,
"hostName":"c6401.ambari.apache.org",
"rackName":"/default-rack",
- "ipv4":"192.168.64.101",
- "hostLevelParams": {
- "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]"
- }
+ "ipv4":"192.168.64.101"
},
{
"hostId":1,
"hostName":"c6402.ambari.apache.org",
"rackName":"/default-rack",
- "ipv4":"192.168.64.102",
- "hostLevelParams": {
- "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]"
- }
+ "ipv4":"192.168.64.102"
}
]
},
@@ -87,10 +81,7 @@
"hostId":0,
"hostName":"c6401.ambari.apache.org",
"rackName":"/default-rack",
- "ipv4":"192.168.64.101",
- "hostLevelParams": {
- "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]"
- }
+ "ipv4":"192.168.64.101"
}
]
}