You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/06/08 10:43:34 UTC
ambari git commit: AMBARI-21199. Run status commands with real
configurations and parameters information (aonishuk)
Repository: ambari
Updated Branches:
refs/heads/branch-3.0-perf b1d357ad1 -> c8aecb772
AMBARI-21199. Run status commands with real configurations and parameters information (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c8aecb77
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c8aecb77
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c8aecb77
Branch: refs/heads/branch-3.0-perf
Commit: c8aecb7721b574f0cc4604abcf93456ca4de93d6
Parents: b1d357a
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Thu Jun 8 13:43:14 2017 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Thu Jun 8 13:43:14 2017 +0300
----------------------------------------------------------------------
.../src/main/python/ambari_agent/ActionQueue.py | 21 +-
.../main/python/ambari_agent/ClusterCache.py | 16 +-
.../python/ambari_agent/ClusterTopologyCache.py | 67 +++++-
.../ambari_agent/ComponentStatusExecutor.py | 32 +--
.../ambari_agent/CustomServiceOrchestrator.py | 123 +++++++----
.../src/main/python/ambari_agent/FileCache.py | 4 +-
.../python/ambari_agent/InitializerModule.py | 4 +-
.../src/main/python/ambari_agent/Utils.py | 2 +
.../ambari_agent/TestAgentStompResponses.py | 11 +-
.../stomp/configurations_update.json | 21 ++
.../dummy_files/stomp/execution_commands.json | 2 +-
.../stomp/metadata_after_registration.json | 220 ++++---------------
.../stomp/topology_add_component.json | 7 +-
.../dummy_files/stomp/topology_add_host.json | 5 +-
.../stomp/topology_cache_expected.json | 69 +++---
.../dummy_files/stomp/topology_create.json | 43 ++--
.../agent/stomp/AgentReportsController.java | 6 +-
17 files changed, 319 insertions(+), 334 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index dbd9f4c..4cef88b 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -27,12 +27,10 @@ import os
import ambari_simplejson as json
import time
import signal
-import copy
from AgentException import AgentException
from LiveStatus import LiveStatus
from ActualConfigHandler import ActualConfigHandler
-from CustomServiceOrchestrator import CustomServiceOrchestrator
from ambari_agent.BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
from ambari_commons.str_utils import split_on_chunks
from resource_management.libraries.script import Script
@@ -77,12 +75,11 @@ class ActionQueue(threading.Thread):
self.commandQueue = Queue.Queue()
self.backgroundCommandQueue = Queue.Queue()
self.commandStatuses = initializer_module.commandStatuses
- self.configurations_cache = initializer_module.configurations_cache
self.config = initializer_module.ambariConfig
self.configTags = {}
self.stop_event = initializer_module.stop_event
self.tmpdir = self.config.get('agent', 'prefix')
- self.customServiceOrchestrator = CustomServiceOrchestrator(self.config)
+ self.customServiceOrchestrator = initializer_module.customServiceOrchestrator
self.parallel_execution = self.config.get_parallel_exec_option()
if self.parallel_execution == 1:
logger.info("Parallel execution is enabled, will execute agent commands in parallel")
@@ -92,18 +89,16 @@ class ActionQueue(threading.Thread):
for command in commands:
if not command.has_key('serviceName'):
command['serviceName'] = "null"
+ if command.has_key('clusterId'):
+ command['clusterId'] = "null"
if not command.has_key('clusterName'):
command['clusterName'] = 'null'
-
- if command.has_key('clusterId'):
- cluster_id = command['clusterId']
- # TODO STOMP: what if has no configs yet?
- if cluster_id != 'null':
- command['configurations'] = dict(self.configurations_cache[str(cluster_id)])
+
+
logger.info("Adding " + command['commandType'] + " for role " + \
command['role'] + " for service " + \
- command['serviceName'] + " of cluster " + \
- command['clusterName'] + " to the queue.")
+ command['serviceName'] + " of cluster_id " + \
+ command['clusterId'] + " to the queue.")
if command['commandType'] == self.BACKGROUND_EXECUTION_COMMAND :
self.backgroundCommandQueue.put(self.createCommandHandle(command))
else:
@@ -170,7 +165,7 @@ class ActionQueue(threading.Thread):
except:
logger.exception("ActionQueue thread failed with exception:")
raise
-
+
logger.info("ActionQueue thread has successfully finished")
def processBackgroundQueueSafeEmpty(self):
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
index 8e91afe..a3b84ad 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
@@ -57,12 +57,12 @@ class ClusterCache(dict):
with self.__file_lock:
with open(self.__current_cache_json_file, 'r') as fp:
cache_dict = json.load(fp)
-
+ """
for cluster_id, cache in cache_dict.iteritems():
immutable_cache = Utils.make_immutable(cache)
cache_dict[cluster_id] = immutable_cache
-
- super(ClusterCache, self).__init__(cache_dict)
+ """
+ self.rewrite_cache(cache_dict)
def get_cluster_ids(self):
return self.keys()
@@ -80,6 +80,9 @@ class ClusterCache(dict):
for cache_id_to_delete in cache_ids_to_delete:
del self[cache_id_to_delete]
+ self.on_cache_update()
+ self.persist_cache()
+
def rewrite_cluster_cache(self, cluster_id, cache):
"""
@@ -98,8 +101,6 @@ class ClusterCache(dict):
with self._cache_lock:
self[cluster_id] = immutable_cache
- self.persist_cache()
-
def persist_cache(self):
# ensure that our cache directory exists
if not os.path.exists(self.cluster_cache_dir):
@@ -113,6 +114,11 @@ class ClusterCache(dict):
with self._cache_lock:
return Utils.get_mutable_copy(self)
+ def on_cache_update(self):
+ """
+ Call back function called then cache is updated
+ """
+ pass
def get_cache_name(self):
raise NotImplemented()
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
index 5810e67..f138c57 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
@@ -18,7 +18,12 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
+
+from ambari_agent import hostname
from ambari_agent.ClusterCache import ClusterCache
+from ambari_agent.Utils import ImmutableDictionary
+
+from collections import defaultdict
import logging
logger = logging.getLogger(__name__)
@@ -30,32 +35,72 @@ class ClusterTopologyCache(ClusterCache):
topology properties.
"""
- def __init__(self, cluster_cache_dir):
+ def __init__(self, cluster_cache_dir, config):
"""
Initializes the topology cache.
:param cluster_cache_dir:
:return:
"""
+ self.hosts_to_id = ImmutableDictionary({})
+ self.components_by_key = ImmutableDictionary({})
+ self.hostname = hostname.hostname(config)
+ self.current_host_ids_to_cluster = {}
super(ClusterTopologyCache, self).__init__(cluster_cache_dir)
def get_cache_name(self):
return 'topology'
- @staticmethod
- def find_host_by_id(host_dicts, host_id):
+ def on_cache_update(self):
+ hosts_to_id = defaultdict(lambda:{})
+ components_by_key = defaultdict(lambda:{})
+
+ for cluster_id, cluster_topology in self.iteritems():
+ for host_dict in cluster_topology.hosts:
+ hosts_to_id[cluster_id][host_dict.hostId] = host_dict
+
+ if host_dict.hostName == self.hostname:
+ self.current_host_ids_to_cluster[cluster_id] = host_dict.hostId
+
+ for component_dict in cluster_topology.components:
+ key = "{0}/{1}".format(component_dict.serviceName, component_dict.componentName)
+ components_by_key[cluster_id][key] = component_dict
+
+ self.hosts_to_id = ImmutableDictionary(hosts_to_id)
+ self.components_by_key = ImmutableDictionary(components_by_key)
+
+ def get_component_info_by_key(self, cluster_id, service_name, component_name):
+ """
+ Find component by service_name and component_name in list of component dictionaries.
+ """
+ key = "{0}/{1}".format(service_name, component_name)
+
+ try:
+ return self.components_by_key[cluster_id][key]
+ except KeyError:
+ return None
+
+ def get_host_info_by_id(self, cluster_id, host_id):
"""
Find host by id in list of host dictionaries.
"""
+ try:
+ return self.hosts_to_id[cluster_id][host_id]
+ except KeyError:
+ return None
+
+ def get_current_host_info(self, cluster_id):
+ current_host_id = self.current_host_ids_to_cluster[cluster_id]
+ return self.get_host_info_by_id(cluster_id, current_host_id)
+
+ @staticmethod
+ def _find_host_by_id_in_dict(host_dicts, host_id):
for host_dict in host_dicts:
if host_dict['hostId'] == host_id:
return host_dict
return None
@staticmethod
- def find_component(component_dicts, service_name, component_name):
- """
- Find component by service_name and component_name in list of component dictionaries.
- """
+ def _find_component_in_dict(component_dicts, service_name, component_name):
for component_dict in component_dicts:
if component_dict['serviceName'] == service_name and component_dict['componentName'] == component_name:
return component_dict
@@ -81,7 +126,7 @@ class ClusterTopologyCache(ClusterCache):
if 'hosts' in cluster_updates_dict:
hosts_mutable_list = mutable_dict[cluster_id]['hosts']
for host_updates_dict in cluster_updates_dict['hosts']:
- host_mutable_dict = ClusterTopologyCache.find_host_by_id(hosts_mutable_list, host_updates_dict['hostId'])
+ host_mutable_dict = ClusterTopologyCache._find_host_by_id_in_dict(hosts_mutable_list, host_updates_dict['hostId'])
if host_mutable_dict is not None:
host_mutable_dict.update(host_updates_dict)
else:
@@ -90,7 +135,7 @@ class ClusterTopologyCache(ClusterCache):
if 'components' in cluster_updates_dict:
components_mutable_list = mutable_dict[cluster_id]['components']
for component_updates_dict in cluster_updates_dict['components']:
- component_mutable_dict = ClusterTopologyCache.find_component(components_mutable_list, component_updates_dict['serviceName'], component_updates_dict['componentName'])
+ component_mutable_dict = ClusterTopologyCache._find_component_in_dict(components_mutable_list, component_updates_dict['serviceName'], component_updates_dict['componentName'])
if component_mutable_dict is not None:
component_updates_dict['hostIds'] += component_mutable_dict['hostIds']
component_updates_dict['hostIds'] = list(set(component_updates_dict['hostIds']))
@@ -121,7 +166,7 @@ class ClusterTopologyCache(ClusterCache):
if 'hosts' in cluster_updates_dict:
hosts_mutable_list = mutable_dict[cluster_id]['hosts']
for host_updates_dict in cluster_updates_dict['hosts']:
- host_to_delete = ClusterTopologyCache.find_host_by_id(hosts_mutable_list, host_updates_dict['hostId'])
+ host_to_delete = ClusterTopologyCache._find_host_by_id_in_dict(hosts_mutable_list, host_updates_dict['hostId'])
if host_to_delete is not None:
mutable_dict[cluster_id]['hosts'] = [host_dict for host_dict in hosts_mutable_list if host_dict != host_to_delete]
else:
@@ -130,7 +175,7 @@ class ClusterTopologyCache(ClusterCache):
if 'components' in cluster_updates_dict:
components_mutable_list = mutable_dict[cluster_id]['components']
for component_updates_dict in cluster_updates_dict['components']:
- component_mutable_dict = ClusterTopologyCache.find_component(components_mutable_list, component_updates_dict['serviceName'], component_updates_dict['componentName'])
+ component_mutable_dict = ClusterTopologyCache._find_component_in_dict(components_mutable_list, component_updates_dict['serviceName'], component_updates_dict['componentName'])
if 'hostIds' in component_mutable_dict:
exclude_host_ids = component_updates_dict['hostIds']
component_mutable_dict['hostIds'] = [host_id for host_id in component_mutable_dict['hostIds'] if host_id not in exclude_host_ids]
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index 3a2e105..520c97d 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -18,7 +18,6 @@ See the License for the specific language governing permissions and
limitations under the License.
'''
-import random
import logging
import threading
@@ -32,6 +31,7 @@ class ComponentStatusExecutor(threading.Thread):
self.initializer_module = initializer_module
self.metadata_cache = initializer_module.metadata_cache
self.topology_cache = initializer_module.topology_cache
+ self.customServiceOrchestrator = initializer_module.customServiceOrchestrator
self.stop_event = initializer_module.stop_event
self.reported_component_status = defaultdict(lambda:defaultdict(lambda:None)) # component statuses which were received by server
threading.Thread.__init__(self)
@@ -53,17 +53,14 @@ class ComponentStatusExecutor(threading.Thread):
# multithreading: if cluster was deleted during iteration
continue
- #if 'status_commands_to_run' in cluster_metadata:
- # continue
-
- #status_commands_to_run = cluster_metadata.status_commands_to_run
+ if not 'status_commands_to_run' in metadata_cache:
+ continue
- # TODO STOMP: read this from metadata
- status_commands_to_run = ['STATUS', 'SECURITY_STATUS']
+ status_commands_to_run = metadata_cache.status_commands_to_run
cluster_components = topology_cache.components
for component_dict in cluster_components:
- for command in status_commands_to_run:
+ for command_name in status_commands_to_run:
if self.stop_event.is_set():
break
@@ -71,19 +68,26 @@ class ComponentStatusExecutor(threading.Thread):
service_name = component_dict.serviceName
component_name = component_dict.componentName
- # TODO STOMP: run real command
- logger.info("Running {0}/{1}".format(component_dict.statusCommandParams.service_package_folder, component_dict.statusCommandParams.script))
- #self.customServiceOrchestrator.requestComponentStatus(command)
- status = random.choice(["INSTALLED","STARTED"])
+ command_dict = {
+ 'serviceName': service_name,
+ 'role': component_name,
+ 'clusterId': cluster_id,
+ 'commandType': 'STATUS_COMMAND',
+ }
+
+ component_status_result = self.customServiceOrchestrator.requestComponentStatus(command_dict)
+ logger.info(component_status_result)
+ status = "STARTED" if component_status_result['exitcode'] == 0 else "INSTALLED"
+
result = {
'serviceName': service_name,
'componentName': component_name,
- 'command': command,
+ 'command': command_name,
'status': status,
'clusterId': cluster_id,
}
- if status != self.reported_component_status[component_name][command]:
+ if status != self.reported_component_status[component_name][command_name]:
logging.info("Status for {0} has changed to {1}".format(component_name, status))
cluster_reports[cluster_id].append(result)
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 656e9a1..2350504 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -24,11 +24,11 @@ import ambari_simplejson as json
import sys
from ambari_commons import shell
import threading
+import copy
from FileCache import FileCache
from AgentException import AgentException
from PythonExecutor import PythonExecutor
-from PythonReflectiveExecutor import PythonReflectiveExecutor
from resource_management.libraries.functions.log_process_information import log_process_information
from resource_management.core.utils import PasswordString
import subprocess
@@ -64,7 +64,6 @@ class CustomServiceOrchestrator():
FREQUENT_COMMANDS = [COMMAND_NAME_STATUS]
DONT_DEBUG_FAILURES_FOR_COMMANDS = FREQUENT_COMMANDS
- REFLECTIVELY_RUN_COMMANDS = FREQUENT_COMMANDS # -- commands which run a lot and often (this increases their speed)
DONT_BACKUP_LOGS_FOR_COMMANDS = FREQUENT_COMMANDS
# Path where hadoop credential JARS will be available
@@ -78,27 +77,30 @@ class CustomServiceOrchestrator():
# Property name for credential store class path
CREDENTIAL_STORE_CLASS_PATH_NAME = 'credentialStoreClassPath'
- def __init__(self, config):
- self.config = config
- self.tmp_dir = config.get('agent', 'prefix')
- self.force_https_protocol = config.get_force_https_protocol()
+ def __init__(self, initializer_module):
+ self.metadata_cache = initializer_module.metadata_cache
+ self.topology_cache = initializer_module.topology_cache
+ self.configurations_cache = initializer_module.configurations_cache
+ self.config = initializer_module.ambariConfig
+ self.tmp_dir = self.config.get('agent', 'prefix')
+ self.force_https_protocol = self.config.get_force_https_protocol()
self.exec_tmp_dir = Constants.AGENT_TMP_DIR
- self.file_cache = FileCache(config)
+ self.file_cache = FileCache(self.config)
self.status_commands_stdout = os.path.join(self.tmp_dir,
'status_command_stdout.txt')
self.status_commands_stderr = os.path.join(self.tmp_dir,
'status_command_stderr.txt')
- self.public_fqdn = hostname.public_hostname(config)
+ self.public_fqdn = hostname.public_hostname(self.config)
# TODO STOMP: cache reset should be called on every agent registration
#controller.registration_listeners.append(self.file_cache.reset)
# Construct the hadoop credential lib JARs path
- self.credential_shell_lib_path = os.path.join(config.get('security', 'credential_lib_dir',
+ self.credential_shell_lib_path = os.path.join(self.config.get('security', 'credential_lib_dir',
self.DEFAULT_CREDENTIAL_SHELL_LIB_PATH), '*')
- self.credential_conf_dir = config.get('security', 'credential_conf_dir', self.DEFAULT_CREDENTIAL_CONF_DIR)
+ self.credential_conf_dir = self.config.get('security', 'credential_conf_dir', self.DEFAULT_CREDENTIAL_CONF_DIR)
- self.credential_shell_cmd = config.get('security', 'credential_shell_cmd', self.DEFAULT_CREDENTIAL_SHELL_CMD)
+ self.credential_shell_cmd = self.config.get('security', 'credential_shell_cmd', self.DEFAULT_CREDENTIAL_SHELL_CMD)
# Clean up old status command files if any
try:
@@ -124,7 +126,7 @@ class CustomServiceOrchestrator():
.format(tid=str(task_id), reason=reason, pid=pid))
log_process_information(logger)
shell.kill_process_with_children(pid)
- else:
+ else:
logger.warn("Unable to find process associated with taskId = %s" % task_id)
def get_py_executor(self, forced_command_name):
@@ -132,10 +134,7 @@ class CustomServiceOrchestrator():
Wrapper for unit testing
:return:
"""
- if forced_command_name in self.REFLECTIVELY_RUN_COMMANDS:
- return PythonReflectiveExecutor(self.tmp_dir, self.config)
- else:
- return PythonExecutor(self.tmp_dir, self.config)
+ return PythonExecutor(self.tmp_dir, self.config)
def getProviderDirectory(self, service_name):
"""
@@ -245,7 +244,7 @@ class CustomServiceOrchestrator():
config.pop(value_name, None)
return configtype_credentials
- def generateJceks(self, commandJson):
+ def qJceks(self, commandJson):
"""
Generates the JCEKS file with passwords for the service specified in commandJson
@@ -302,25 +301,26 @@ class CustomServiceOrchestrator():
return cmd_result
- def runCommand(self, command, tmpoutfile, tmperrfile, forced_command_name=None,
- override_output_files=True, retry=False):
+ def runCommand(self, command_header, tmpoutfile, tmperrfile, forced_command_name=None,
+ override_output_files=True, retry=False, is_status_command=False):
"""
forced_command_name may be specified manually. In this case, value, defined at
command json, is ignored.
"""
try:
- script_type = command['commandParams']['script_type']
- script = command['commandParams']['script']
- timeout = int(command['commandParams']['command_timeout'])
+ command = self.generate_command(command_header)
+ script_type = command['script_type'] # TODO STOMP: take this from command?
+ script = command['componentLevelParams']['script']
+ timeout = int('300') # TODO STOMP: fix it
- if 'hostLevelParams' in command and 'jdk_location' in command['hostLevelParams']:
- server_url_prefix = command['hostLevelParams']['jdk_location']
- else:
- server_url_prefix = command['commandParams']['jdk_location']
+ server_url_prefix = command['clusterLevelParams']['jdk_location']
# Status commands have no taskId nor roleCommand
- task_id = command['taskId'] if 'taskId' in command else 'status'
- command_name = command['roleCommand'] if 'roleCommand' in command else None
+ if not is_status_command:
+ task_id = command['taskId']
+ command_name = command['roleCommand']
+ else:
+ task_id = 'status'
if forced_command_name is not None: # If not supplied as an argument
command_name = forced_command_name
@@ -335,7 +335,7 @@ class CustomServiceOrchestrator():
# forces a hash challenge on the directories to keep them updated, even
# if the return type is not used
- self.file_cache.get_host_scripts_base_dir(server_url_prefix)
+ self.file_cache.get_host_scripts_base_dir(server_url_prefix)
hook_dir = self.file_cache.get_hook_base_dir(command, server_url_prefix)
base_dir = self.file_cache.get_service_base_dir(command, server_url_prefix)
self.file_cache.get_custom_resources_subdir(command, server_url_prefix)
@@ -361,10 +361,11 @@ class CustomServiceOrchestrator():
# If command contains credentialStoreEnabled, then
# generate the JCEKS file for the configurations.
credentialStoreEnabled = False
- if 'credentialStoreEnabled' in command:
- credentialStoreEnabled = (command['credentialStoreEnabled'] == "true")
+ if 'credentialStoreEnabled' in command['serviceLevelParams']:
+ credentialStoreEnabled = (command['serviceLevelParams']['credentialStoreEnabled'] == "true")
if credentialStoreEnabled == True:
+ # TODO STOMP: fix this with execution commands
if 'commandBeingRetried' not in command or command['commandBeingRetried'] != "true":
self.generateJceks(command)
else:
@@ -391,15 +392,15 @@ class CustomServiceOrchestrator():
python_executor = self.get_py_executor(forced_command_name)
backup_log_files = not command_name in self.DONT_BACKUP_LOGS_FOR_COMMANDS
log_out_files = self.config.get("logging","log_out_files", default="0") != "0"
-
+
for py_file, current_base_dir in filtered_py_file_list:
log_info_on_failure = not command_name in self.DONT_DEBUG_FAILURES_FOR_COMMANDS
script_params = [command_name, json_path, current_base_dir, tmpstrucoutfile, logger_level, self.exec_tmp_dir,
self.force_https_protocol]
-
+
if log_out_files:
script_params.append("-o")
-
+
ret = python_executor.run_file(py_file, script_params,
tmpoutfile, tmperrfile, timeout,
tmpstrucoutfile, self.map_task_to_process,
@@ -451,7 +452,44 @@ class CustomServiceOrchestrator():
return "\nCommand aborted."
return None
- def requestComponentStatus(self, command):
+ def generate_command(self, command_header):
+ service_name = command_header['serviceName']
+ component_name = command_header['role']
+ cluster_id = str(command_header['clusterId'])
+
+ metadata_cache = self.metadata_cache[cluster_id]
+ configurations_cache = self.configurations_cache[cluster_id]
+
+ component_dict = self.topology_cache.get_component_info_by_key(cluster_id, service_name, component_name)
+
+ command_dict = {
+ 'clusterLevelParams': metadata_cache.clusterLevelParams,
+ 'serviceLevelParams': metadata_cache.serviceLevelParams[service_name],
+ 'hostLevelParams': self.topology_cache.get_current_host_info(cluster_id).hostLevelParams,
+ 'componentLevelParams': component_dict.componentLevelParams,
+ 'script_type': self.SCRIPT_TYPE_PYTHON
+ }
+ command_dict.update(configurations_cache)
+ #command_dict['componentLevelParams']['script'] = component_dict.statusCommandParams['script']
+ #command_dict['serviceLevelParams']['hooks_folder'] = metadata_cache['hooks_folder']
+ #command_dict['serviceLevelParams']['service_package_folder'] = component_dict.statusCommandParams['service_package_folder']
+
+ command_dict['agentLevelParams'] = {
+ 'public_hostname': self.public_fqdn,
+ 'agentCacheDir': self.config.get('agent', 'cache_dir'),
+ }
+ command_dict['agentLevelParams']["agentConfigParams"] = {
+ "agent": {
+ "parallel_execution": self.config.get_parallel_exec_option(),
+ "use_system_proxy_settings": self.config.use_system_proxy_setting()
+ }
+ }
+ command = copy.copy(command_header)
+ command.update(command_dict)
+
+ return command
+
+ def requestComponentStatus(self, command_header):
"""
Component status is determined by exit code, returned by runCommand().
Exit code 0 means that component is running and any other exit code means that
@@ -461,9 +499,9 @@ class CustomServiceOrchestrator():
if logger.level == logging.DEBUG:
override_output_files = False
- res = self.runCommand(command, self.status_commands_stdout,
+ res = self.runCommand(command_header, self.status_commands_stdout,
self.status_commands_stderr, self.COMMAND_NAME_STATUS,
- override_output_files=override_output_files)
+ override_output_files=override_output_files, is_status_command=True)
return res
def resolve_script_path(self, base_dir, script):
@@ -497,17 +535,6 @@ class CustomServiceOrchestrator():
"""
Converts command to json file and returns file path
"""
- # Perform few modifications to stay compatible with the way in which
- public_fqdn = self.public_fqdn
- command['public_hostname'] = public_fqdn
- # Add cache dir to make it visible for commands
- command["hostLevelParams"]["agentCacheDir"] = self.config.get('agent', 'cache_dir')
- command["agentConfigParams"] = {
- "agent": {
- "parallel_execution": self.config.get_parallel_exec_option(),
- "use_system_proxy_settings": self.config.use_system_proxy_setting()
- }
- }
# Now, dump the json file
command_type = command['commandType']
from ActionQueue import ActionQueue # To avoid cyclic dependency
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/main/python/ambari_agent/FileCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/FileCache.py b/ambari-agent/src/main/python/ambari_agent/FileCache.py
index 139dcba..3bff613 100644
--- a/ambari-agent/src/main/python/ambari_agent/FileCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/FileCache.py
@@ -73,7 +73,7 @@ class FileCache():
"""
Returns a base directory for service
"""
- service_subpath = command['commandParams']['service_package_folder']
+ service_subpath = command['serviceLevelParams']['service_package_folder']
return self.provide_directory(self.cache_dir, service_subpath,
server_url_prefix)
@@ -83,7 +83,7 @@ class FileCache():
Returns a base directory for hooks
"""
try:
- hooks_subpath = command['commandParams']['hooks_folder']
+ hooks_subpath = command['serviceLevelParams']['hooks_folder']
except KeyError:
return None
subpath = os.path.join(self.STACKS_CACHE_DIRECTORY, hooks_subpath)
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index 4d0ac9b..88c8b91 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -30,6 +30,7 @@ from ambari_agent.Utils import lazy_property
from ambari_agent.security import AmbariStompConnection
from ambari_agent.ActionQueue import ActionQueue
from ambari_agent.CommandStatusDict import CommandStatusDict
+from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
logger = logging.getLogger()
@@ -66,8 +67,9 @@ class InitializerModule:
self.is_registered = False
self.metadata_cache = ClusterMetadataCache(self.cluster_cache_dir)
- self.topology_cache = ClusterTopologyCache(self.cluster_cache_dir)
+ self.topology_cache = ClusterTopologyCache(self.cluster_cache_dir, self.ambariConfig)
self.configurations_cache = ClusterConfigurationCache(self.cluster_cache_dir)
+ self.customServiceOrchestrator = CustomServiceOrchestrator(self)
self.commandStatuses = CommandStatusDict(self)
self.action_queue = ActionQueue(self)
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/main/python/ambari_agent/Utils.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Utils.py b/ambari-agent/src/main/python/ambari_agent/Utils.py
index 845eb30..de073bb 100644
--- a/ambari-agent/src/main/python/ambari_agent/Utils.py
+++ b/ambari-agent/src/main/python/ambari_agent/Utils.py
@@ -74,6 +74,8 @@ class BlockingDictionary():
class Utils(object):
@staticmethod
def make_immutable(value):
+ if isinstance(value, ImmutableDictionary):
+ return value
if isinstance(value, dict):
return ImmutableDictionary(value)
if isinstance(value, (list, tuple)):
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
index c969c75..9d57261 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -35,6 +35,8 @@ from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
from mock.mock import MagicMock, patch
+@patch("socket.gethostbyname", new=MagicMock(return_value="192.168.64.101"))
+@patch("ambari_agent.hostname.hostname", new=MagicMock(return_value="c6401.ambari.apache.org"))
class TestAgentStompResponses(BaseStompServerTestCase):
def setUp(self):
self.remove_files(['/tmp/cluster_cache/configurations.json', '/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json'])
@@ -106,7 +108,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
action_status_failed_frame = json.loads(self.server.frames_queue.get().body)
initializer_module.stop_event.set()
- f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'0'}))
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'1'}))
self.server.topic_manager.send(f)
heartbeat_thread.join()
@@ -133,6 +135,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module)
heartbeat_thread.start()
+
action_queue = initializer_module.action_queue
action_queue.start()
@@ -168,7 +171,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
initializer_module.stop_event.set()
- f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'0'}))
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'1'}))
self.server.topic_manager.send(f)
heartbeat_thread.join()
@@ -240,7 +243,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
initializer_module.stop_event.set()
- f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'0'}))
+ f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'1'}))
self.server.topic_manager.send(f)
- heartbeat_thread.join()
\ No newline at end of file
+ heartbeat_thread.join()
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json
index c415c7d..e8e1ab5 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json
@@ -17,6 +17,27 @@
"initLimit":"10",
"dataDir":"/hadoop/zookeeper",
"tickTime":"2000"
+ },
+ "hadoop-env": {
+ "proxyuser_group": "users",
+ "hdfs_user_nproc_limit": "65536",
+ "hdfs_log_dir_prefix": "/var/log/hadoop",
+ "keyserver_host": " ",
+ "namenode_opt_maxnewsize": "200m",
+ "nfsgateway_heapsize": "1024",
+ "dtnode_heapsize": "1024m",
+ "namenode_heapsize": "1024m",
+ "namenode_opt_maxpermsize": "256m",
+ "namenode_opt_permsize": "128m",
+ "hdfs_tmp_dir": "/tmp",
+ "hdfs_user": "hdfs",
+ "hdfs_user_nofile_limit": "128000",
+ "namenode_opt_newsize": "200m",
+ "keyserver_port": "",
+ "namenode_backup_dir": "/tmp/upgrades",
+ "hadoop_root_logger": "INFO,RFA",
+ "hadoop_heapsize": "1024",
+ "hadoop_pid_dir_prefix": "/var/run/hadoop"
}
},
"configurationAttributes":{
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
index 536233d..6e84319 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
@@ -11,7 +11,7 @@
"commandType":"EXECUTION_COMMAND",
"roleCommand":"START",
"clusterName": "c1",
- "clusterId": 0,
+ "clusterId": "0",
"configuration_credentials":{
},
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
index 0dc5aff..f60b49a 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
@@ -1,185 +1,43 @@
{
- "hash": "c2bea6695221368416b2412fec2ba0d7",
- "clusters": {
- "0": {
- "serviceSpecifics": {
- "GANGLIA": {
- "version": "3.5.0",
- "credentialStoreEnabled": false,
- "status_commands_timeout": null
- },
- "DRUID": {
- "version": "0.9.2",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "TEZ": {
- "version": "0.7.0",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "SPARK": {
- "version": "1.6.x",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "HBASE": {
- "version": "1.1.2",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "RANGER_KMS": {
- "version": "0.7.0",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "ATLAS": {
- "version": "0.8.0",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "HIVE": {
- "version": "1.2.1000",
- "credentialStoreEnabled": true,
- "status_commands_timeout": null
- },
- "SLIDER": {
- "version": "0.92.0",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "AMBARI_INFRA": {
- "version": "0.1.0",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "FLUME": {
- "version": "1.5.2",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "MAHOUT": {
- "version": "0.9.0",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "SQOOP": {
- "version": "1.4.6",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "OOZIE": {
- "version": "4.2.0",
- "credentialStoreEnabled": true,
- "status_commands_timeout": 300
- },
- "HDFS": {
- "version": "2.7.3",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "MAPREDUCE2": {
- "version": "2.7.3",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "ACCUMULO": {
- "version": "1.7.0",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "ZOOKEEPER": {
- "version": "3.4.6",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "YARN": {
- "version": "2.7.3",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "KERBEROS": {
- "version": "1.10.3-10",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "KNOX": {
- "version": "0.12.0",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "PIG": {
- "version": "0.16.0",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "STORM": {
- "version": "1.1.0",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "RANGER": {
- "version": "0.7.0",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "AMBARI_METRICS": {
- "version": "0.1.0",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 600
- },
- "ZEPPELIN": {
- "version": "0.7.0",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "KAFKA": {
- "version": "0.10.1",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "LOGSEARCH": {
- "version": "0.5.0",
- "credentialStoreEnabled": true,
- "status_commands_timeout": 300
- },
- "FALCON": {
- "version": "0.10.0",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- },
- "SPARK2": {
- "version": "2.x",
- "credentialStoreEnabled": false,
- "status_commands_timeout": 300
- }
- },
- "clusterLevelParams": {
- "host_sys_prepped": "false",
- "java_home": null,
- "agent_stack_retry_count": "5",
- "jdk_location": "http://c6401.ambari.apache.org:8080/resources/",
- "jdk_name": null,
- "stack_version": "2.6",
- "user_list": "[\"accumulo\",\"zookeeper\",\"ams\",\"ambari-qa\",\"hdfs\",\"yarn\",\"mapred\"]",
- "mysql_jdbc_url": "http://dvitiiuk-System-Product-Name:8080/resources//mysql-connector-java.jar",
- "oracle_jdbc_url": "http://dvitiiuk-System-Product-Name:8080/resources//ojdbc6.jar",
- "ambari_db_rca_password": "mapred",
- "jce_name": null,
- "group_list": "[\"hadoop\",\"users\"]",
- "db_name": "ambari",
- "ambari_db_rca_driver": "org.postgresql.Driver",
- "ambari_db_rca_username": "mapred",
- "java_version": "8",
- "not_managed_hdfs_path_list": "[\"/mr-history/done\",\"/app-logs\",\"/tmp\"]",
- "db_driver_filename": "mysql-connector-java.jar",
- "stack_name": "HDP",
- "ambari_db_rca_url": "jdbc:postgresql://c6401.ambari.apache.org/ambarirca",
- "agent_stack_retry_on_unavailability": "false",
- "user_groups": "{}"
- },
- "status_commands_to_run": ["STATUS"],
- "hooks_folder": "HDP/2.0.6/hooks"
+ "hash": "c2bea6695221368416b2412fec2ba0d7",
+ "clusters": {
+ "0": {
+ "clusterLevelParams": {
+ "jdk_location": "http://gc6401:8080/resources/",
+ "not_managed_hdfs_path_list": "[\"/mr-history/done\",\"/app-logs\",\"/tmp\"]",
+ "agent_stack_retry_on_unavailability": "false",
+ "ambari_db_rca_url": "jdbc:postgresql://gc6401/ambarirca",
+ "stack_name": "HDP",
+ "java_version": "8",
+ "ambari_db_rca_password": "mapred",
+ "group_list": "[\"hadoop\",\"users\"]",
+ "host_sys_prepped": "false",
+ "oracle_jdbc_url": "http://gc6401:8080/resources//ojdbc6.jar",
+ "jdk_name": "jdk-8u112-linux-x64.tar.gz",
+ "ambari_db_rca_username": "mapred",
+ "mysql_jdbc_url": "http://gc6401:8080/resources//mysql-connector-java.jar",
+ "agent_stack_retry_count": "5",
+ "db_driver_filename": "mysql-connector-java.jar",
+ "jce_name": "jce_policy-8.zip",
+ "user_groups": "{}",
+ "stack_version": "2.6",
+ "db_name": "ambari",
+ "ambari_db_rca_driver": "org.postgresql.Driver",
+ "java_home": "/usr/jdk64/jdk1.8.0_112",
+ "user_list": "[\"zookeeper\",\"ambari-qa\",\"hdfs\",\"yarn\",\"mapred\"]",
+ "hooks_folder": "HDP/2.0.6/hooks"
+ },
+ "serviceLevelParams": {
+ "HDFS": {
+ "credentialStoreEnabled": false,
+ "status_commands_timeout": 300,
+ "version": "2.7.3",
+ "service_package_folder": "common-services/HDFS/2.1.0.2.0/package"
}
+ },
+ "status_commands_to_run": [
+ "STATUS"
+ ]
}
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json
index 2c37111..1514516 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json
@@ -12,9 +12,10 @@
0,
1
],
- "statusCommandParams":{
- "script":"scripts/snamenode.py",
- "servicePackageFolder":"common-services/HDFS/2.1.0.2.0/package"
+ "componentLevelParams": {
+ "unlimited_key_jce_required": "false",
+ "clientsToUpdateConfigs": "[\"*\"]",
+ "script":"scripts/snamenode.py"
}
}
]
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json
index a9407c3..2458f08 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json
@@ -8,7 +8,10 @@
"hostId":2,
"hostName":"c6403.ambari.apache.org",
"rackName":"/default-rack",
- "ipv4":"192.168.64.103"
+ "ipv4":"192.168.64.103",
+ "hostLevelParams": {
+ "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]"
+ }
}
]
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json
index 9894420..53d0e0d 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json
@@ -1,57 +1,66 @@
{
"0": {
- "hosts": [
- {
- "rackName": "/default-rack",
- "hostName": "c6402.ambari.apache.org",
- "ipv4": "192.168.64.102",
- "hostId": 1
- },
- {
- "rackName": "/default-rack",
- "hostName": "c6403.ambari.apache.org",
- "ipv4": "192.168.64.103",
- "hostId": 2
- }
- ],
"components": [
{
- "statusCommandParams": {
- "script": "scripts/datanode.py",
- "servicePackageFolder": "common-services/HDFS/2.1.0.2.0/package"
+ "componentLevelParams": {
+ "clientsToUpdateConfigs": "[\"*\"]",
+ "script": "scripts/namenode.py",
+ "unlimited_key_jce_required": "false"
},
"componentName": "DATANODE",
- "serviceName": "HDFS",
- "version": "2.6.0.3-8",
"hostIds": [
1
- ]
+ ],
+ "serviceName": "HDFS",
+ "version": "2.6.0.3-8"
},
{
- "statusCommandParams": {
+ "componentLevelParams": {
+ "clientsToUpdateConfigs": "[\"*\"]",
"script": "scripts/hdfs_client.py",
- "servicePackageFolder": "common-services/HDFS/2.1.0.2.0/package"
+ "unlimited_key_jce_required": "false"
},
"componentName": "HDFS_CLIENT",
- "version": "2.6.0.3-8",
- "serviceName": "HDFS",
"hostIds": [
0,
1
- ]
+ ],
+ "serviceName": "HDFS",
+ "version": "2.6.0.3-8"
},
{
- "statusCommandParams": {
+ "componentLevelParams": {
+ "clientsToUpdateConfigs": "[\"*\"]",
"script": "scripts/snamenode.py",
- "servicePackageFolder": "common-services/HDFS/2.1.0.2.0/package"
+ "unlimited_key_jce_required": "false"
},
"componentName": "SECONDARY_NAMENODE",
- "serviceName": "HDFS",
- "version": "2.6.0.3-8",
"hostIds": [
0,
1
- ]
+ ],
+ "serviceName": "HDFS",
+ "version": "2.6.0.3-8"
+ }
+ ],
+ "hosts": [
+ {
+ "hostId": 1,
+ "hostLevelParams": {
+ "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]"
+ },
+ "hostName": "c6402.ambari.apache.org",
+ "ipv4": "192.168.64.102",
+ "rackName": "/default-rack"
+ },
+ {
+ "hostId": 2,
+ "hostLevelParams": {
+ "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]"
+ },
+ "hostName": "c6403.ambari.apache.org",
+ "ipv4": "192.168.64.103",
+ "rackName": "/default-rack"
}
]
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json
index cf1afa7..dfe17b9 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json
@@ -11,9 +11,10 @@
"hostIds":[
0
],
- "statusCommandParams":{
- "script":"scripts/namenode.py",
- "servicePackageFolder":"common-services/HDFS/2.1.0.2.0/package"
+ "componentLevelParams": {
+ "unlimited_key_jce_required": "false",
+ "clientsToUpdateConfigs": "[\"*\"]",
+ "script": "scripts/namenode.py"
}
},
{
@@ -24,9 +25,10 @@
0,
1
],
- "statusCommandParams":{
- "script":"scripts/datanode.py",
- "servicePackageFolder":"common-services/HDFS/2.1.0.2.0/package"
+ "componentLevelParams": {
+ "unlimited_key_jce_required": "false",
+ "clientsToUpdateConfigs": "[\"*\"]",
+ "script": "scripts/namenode.py"
}
},
{
@@ -36,9 +38,10 @@
"hostIds":[
0
],
- "statusCommandParams":{
- "script":"scripts/hdfs_client.py",
- "servicePackageFolder":"common-services/HDFS/2.1.0.2.0/package"
+ "componentLevelParams": {
+ "unlimited_key_jce_required": "false",
+ "clientsToUpdateConfigs": "[\"*\"]",
+ "script": "scripts/hdfs_client.py"
}
}
],
@@ -47,13 +50,19 @@
"hostId":0,
"hostName":"c6401.ambari.apache.org",
"rackName":"/default-rack",
- "ipv4":"192.168.64.101"
+ "ipv4":"192.168.64.101",
+ "hostLevelParams": {
+ "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]"
+ }
},
{
"hostId":1,
"hostName":"c6402.ambari.apache.org",
"rackName":"/default-rack",
- "ipv4":"192.168.64.102"
+ "ipv4":"192.168.64.102",
+ "hostLevelParams": {
+ "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]"
+ }
}
]
},
@@ -66,9 +75,10 @@
"hostIds":[
0
],
- "statusCommandParams":{
- "script":"scripts/namenode.py",
- "servicePackageFolder":"common-services/HDFS/2.1.0.2.0/package"
+ "componentLevelParams": {
+ "unlimited_key_jce_required": "false",
+ "clientsToUpdateConfigs": "[\"*\"]",
+ "script": "scripts/namenode.py"
}
}
],
@@ -77,7 +87,10 @@
"hostId":0,
"hostName":"c6401.ambari.apache.org",
"rackName":"/default-rack",
- "ipv4":"192.168.64.101"
+ "ipv4":"192.168.64.101",
+ "hostLevelParams": {
+ "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]"
+ }
}
]
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c8aecb77/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
index 68b7f3b..60bd197 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
@@ -66,11 +66,7 @@ public class AgentReportsController {
componentStatus.setClusterName(clusters.getCluster(report.getClusterId()).getClusterName());
componentStatus.setComponentName(report.getComponentName());
componentStatus.setServiceName(report.getServiceName());
- if (report.getCommand().equals(ComponentStatusReport.CommandStatusCommand.STATUS)) {
- componentStatus.setStatus(report.getStatus().toString());
- } else {
- componentStatus.setSecurityState(report.getStatus().toString());
- }
+ componentStatus.setStatus(report.getStatus().toString());
statuses.add(componentStatus);
}
}