You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2018/08/08 11:05:30 UTC
[ambari] branch branch-2.7 updated: AMBARI-24416. Component
Versions Are Not Reported On Initial Status Commands Anymore (aonishuk)
This is an automated email from the ASF dual-hosted git repository.
aonishuk pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 88e7148 AMBARI-24416. Component Versions Are Not Reported On Initial Status Commands Anymore (aonishuk)
88e7148 is described below
commit 88e7148b192a37a131b0a11343b06de8cb5d8095
Author: Andrew Onishuk <ao...@hortonworks.com>
AuthorDate: Tue Aug 7 22:27:47 2018 +0300
AMBARI-24416. Component Versions Are Not Reported On Initial Status Commands Anymore (aonishuk)
---
.../ambari_agent/ComponentVersionReporter.py | 109 +++++++++++++++++++++
.../src/main/python/ambari_agent/Constants.py | 1 +
.../ambari_agent/CustomServiceOrchestrator.py | 21 ++--
.../main/python/ambari_agent/HeartbeatThread.py | 7 ++
.../main/python/ambari_agent/models/commands.py | 1 +
.../resource_management/libraries/script/script.py | 9 +-
.../apache/ambari/server/agent/AgentReport.java | 36 +++----
.../ambari/server/agent/AgentReportsProcessor.java | 14 +--
...ntReport.java => CommandStatusAgentReport.java} | 35 ++-----
.../server/agent/ComponentStatusAgentReport.java | 36 +++++++
.../server/agent/ComponentVersionAgentReport.java | 36 +++++++
.../ambari/server/agent/HeartBeatHandler.java | 5 +
.../ambari/server/agent/HeartbeatProcessor.java | 73 ++++++++++++--
...AgentReport.java => HostStatusAgentReport.java} | 36 ++-----
.../server/agent/stomp/AgentReportsController.java | 27 +++--
.../agent/stomp/dto/ComponentVersionReport.java | 68 +++++++++++++
.../agent/stomp/dto/ComponentVersionReports.java | 45 +++++++++
17 files changed, 443 insertions(+), 116 deletions(-)
diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentVersionReporter.py b/ambari-agent/src/main/python/ambari_agent/ComponentVersionReporter.py
new file mode 100644
index 0000000..eb4358b
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentVersionReporter.py
@@ -0,0 +1,109 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import logging
+import threading
+
+from ambari_agent import Constants
+from collections import defaultdict
+
+from ambari_agent.models.commands import AgentCommand
+
+logger = logging.getLogger(__name__)
+
+class ComponentVersionReporter(threading.Thread):
+ def __init__(self, initializer_module):
+ self.initializer_module = initializer_module
+ self.topology_cache = initializer_module.topology_cache
+ self.customServiceOrchestrator = initializer_module.customServiceOrchestrator
+ self.server_responses_listener = initializer_module.server_responses_listener
+ threading.Thread.__init__(self)
+
+ def run(self):
+ """
+ Get version of all components by running get_version execution command.
+ """
+ try:
+ cluster_reports = defaultdict(lambda:[])
+
+ for cluster_id in self.topology_cache.get_cluster_ids():
+ topology_cache = self.topology_cache[cluster_id]
+
+ if 'components' not in topology_cache:
+ continue
+
+ current_host_id = self.topology_cache.get_current_host_id(cluster_id)
+
+ if current_host_id is None:
+ continue
+
+ cluster_components = topology_cache.components
+ for component_dict in cluster_components:
+ # check if component is installed on current host
+ if current_host_id not in component_dict.hostIds:
+ continue
+
+ service_name = component_dict.serviceName
+ component_name = component_dict.componentName
+
+ result = self.check_component_version(cluster_id, service_name, component_name)
+
+ if result:
+ cluster_reports[cluster_id].append(result)
+
+ self.send_updates_to_server(cluster_reports)
+ except:
+ logger.exception("Exception in ComponentVersionReporter")
+
+ def check_component_version(self, cluster_id, service_name, component_name):
+ """
+ Returns components version
+ """
+ # if not a component
+ if self.topology_cache.get_component_info_by_key(cluster_id, service_name, component_name) is None:
+ return None
+
+ command_dict = {
+ 'serviceName': service_name,
+ 'role': component_name,
+ 'clusterId': cluster_id,
+ 'commandType': AgentCommand.get_version,
+ }
+
+ version_result = self.customServiceOrchestrator.requestComponentStatus(command_dict, command_name=AgentCommand.get_version)
+
+ if version_result['exitcode'] or not 'structuredOut' in version_result or not 'version' in version_result['structuredOut']:
+ logger.error("Could not get version for component {0} of {1} service cluster_id={2}. Command returned: {3}".format(component_name, service_name, cluster_id, version_result))
+ return None
+
+ # TODO: check if no strout or version if not there
+
+ result = {
+ 'serviceName': service_name,
+ 'componentName': component_name,
+ 'version': version_result['structuredOut']['version'],
+ 'clusterId': cluster_id,
+ }
+
+ return result
+
+ def send_updates_to_server(self, cluster_reports):
+ if not cluster_reports or not self.initializer_module.is_registered:
+ return
+
+ self.initializer_module.connection.send(message={'clusters': cluster_reports}, destination=Constants.COMPONENT_VERSION_REPORTS_ENDPOINT)
diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py
index ed6b482..91141a6 100644
--- a/ambari-agent/src/main/python/ambari_agent/Constants.py
+++ b/ambari-agent/src/main/python/ambari_agent/Constants.py
@@ -38,6 +38,7 @@ CONFIGURATIONS_REQUEST_ENDPOINT = '/agents/configs'
HOST_LEVEL_PARAMS_TOPIC_ENPOINT = '/agents/host_level_params'
ALERTS_DEFINITIONS_REQUEST_ENDPOINT = '/agents/alert_definitions'
COMPONENT_STATUS_REPORTS_ENDPOINT = '/reports/component_status'
+COMPONENT_VERSION_REPORTS_ENDPOINT = '/reports/component_version'
COMMANDS_STATUS_REPORTS_ENDPOINT = '/reports/commands_status'
HOST_STATUS_REPORTS_ENDPOINT = '/reports/host_status'
ALERTS_STATUS_REPORTS_ENDPOINT = '/reports/alerts_status'
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 1dd4fa0..41f18e5 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -93,6 +93,8 @@ class CustomServiceOrchestrator(object):
'status_command_stdout_{0}.txt')
self.status_commands_stderr = os.path.join(self.tmp_dir,
'status_command_stderr_{0}.txt')
+ self.status_structured_out = os.path.join(self.tmp_dir,
+ 'status_structured-out-{0}.json')
# Construct the hadoop credential lib JARs path
self.credential_shell_lib_path = os.path.join(self.config.get('security', 'credential_lib_dir',
@@ -307,7 +309,7 @@ class CustomServiceOrchestrator(object):
return cmd_result
def runCommand(self, command_header, tmpoutfile, tmperrfile, forced_command_name=None,
- override_output_files=True, retry=False, is_status_command=False):
+ override_output_files=True, retry=False, is_status_command=False, tmpstrucoutfile=None):
"""
forced_command_name may be specified manually. In this case, value, defined at
command json, is ignored.
@@ -348,7 +350,8 @@ class CustomServiceOrchestrator(object):
script_path = self.resolve_script_path(base_dir, script)
script_tuple = (script_path, base_dir)
- tmpstrucoutfile = os.path.join(self.tmp_dir, "structured-out-{0}.json".format(task_id))
+ if not tmpstrucoutfile:
+ tmpstrucoutfile = os.path.join(self.tmp_dir, "structured-out-{0}.json".format(task_id))
# We don't support anything else yet
if script_type.upper() != self.SCRIPT_TYPE_PYTHON:
@@ -374,7 +377,7 @@ class CustomServiceOrchestrator(object):
else:
logger.info("Skipping generation of jceks files as this is a retry of the command")
- json_path = self.dump_command_to_json(command, retry)
+ json_path = self.dump_command_to_json(command, retry, is_status_command)
hooks = self.hooks_orchestrator.resolve_hooks(command, command_name)
""":type hooks ambari_agent.CommandHooksOrchestrator.ResolvedHooks"""
@@ -508,7 +511,7 @@ class CustomServiceOrchestrator(object):
return command
- def requestComponentStatus(self, command_header):
+ def requestComponentStatus(self, command_header, command_name="STATUS"):
"""
Component status is determined by exit code, returned by runCommand().
Exit code 0 means that component is running and any other exit code means that
@@ -521,11 +524,13 @@ class CustomServiceOrchestrator(object):
# make sure status commands that run in parallel don't use the same files
status_commands_stdout = self.status_commands_stdout.format(uuid.uuid4())
status_commands_stderr = self.status_commands_stderr.format(uuid.uuid4())
+ status_structured_out = self.status_structured_out.format(uuid.uuid4())
try:
res = self.runCommand(command_header, status_commands_stdout,
- status_commands_stderr, self.COMMAND_NAME_STATUS,
- override_output_files=override_output_files, is_status_command=True)
+ status_commands_stderr, command_name,
+ override_output_files=override_output_files, is_status_command=True,
+ tmpstrucoutfile=status_structured_out)
finally:
try:
os.unlink(status_commands_stdout)
@@ -545,14 +550,14 @@ class CustomServiceOrchestrator(object):
raise AgentException(message)
return path
- def dump_command_to_json(self, command, retry=False):
+ def dump_command_to_json(self, command, retry=False, is_status_command=False):
"""
Converts command to json file and returns file path
"""
# Now, dump the json file
command_type = command['commandType']
- if command_type == AgentCommand.status:
+ if is_status_command:
# make sure status commands that run in parallel don't use the same files
file_path = os.path.join(self.tmp_dir, "status_command_{0}.json".format(uuid.uuid4()))
else:
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index 3403bb2..2d4e06b 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -27,6 +27,7 @@ from ambari_agent import Constants
from ambari_agent.Register import Register
from ambari_agent.Utils import BlockingDictionary
from ambari_agent.Utils import Utils
+from ambari_agent.ComponentVersionReporter import ComponentVersionReporter
from ambari_agent.listeners.ServerResponsesListener import ServerResponsesListener
from ambari_agent.listeners.TopologyEventListener import TopologyEventListener
from ambari_agent.listeners.ConfigurationEventListener import ConfigurationEventListener
@@ -146,14 +147,20 @@ class HeartbeatThread(threading.Thread):
self.subscribe_to_topics(Constants.POST_REGISTRATION_TOPICS_TO_SUBSCRIBE)
self.run_post_registration_actions()
+
self.initializer_module.is_registered = True
# now when registration is done we can expose connection to other threads.
self.initializer_module._connection = self.connection
+ self.report_components_initial_versions()
+
def run_post_registration_actions(self):
for post_registration_action in self.post_registration_actions:
post_registration_action()
+ def report_components_initial_versions(self):
+ ComponentVersionReporter(self.initializer_module).start()
+
def unregister(self):
"""
Disconnect and remove connection object from initializer_module so other threads cannot use it
diff --git a/ambari-agent/src/main/python/ambari_agent/models/commands.py b/ambari-agent/src/main/python/ambari_agent/models/commands.py
index eb96e9a..e9ea2c8 100644
--- a/ambari-agent/src/main/python/ambari_agent/models/commands.py
+++ b/ambari-agent/src/main/python/ambari_agent/models/commands.py
@@ -19,6 +19,7 @@ limitations under the License.
class AgentCommand(object):
status = "STATUS_COMMAND"
+ get_version = "GET_VERSION"
execution = "EXECUTION_COMMAND"
auto_execution = "AUTO_EXECUTION_COMMAND"
background_execution = "BACKGROUND_EXECUTION_COMMAND"
diff --git a/ambari-common/src/main/python/resource_management/libraries/script/script.py b/ambari-common/src/main/python/resource_management/libraries/script/script.py
index 556412a..52287a4 100644
--- a/ambari-common/src/main/python/resource_management/libraries/script/script.py
+++ b/ambari-common/src/main/python/resource_management/libraries/script/script.py
@@ -260,10 +260,8 @@ class Script(object):
stack_version_unformatted = str(default("/clusterLevelParams/stack_version", ""))
stack_version_formatted = format_stack_version(stack_version_unformatted)
if stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted):
- if command_name.lower() == "status":
- request_version = default("/commandParams/request_version", None)
- if request_version is not None:
- return True
+ if command_name.lower() == "get_version":
+ return True
else:
# Populate version only on base commands
return command_name.lower() == "start" or command_name.lower() == "install" or command_name.lower() == "restart"
@@ -362,6 +360,9 @@ class Script(object):
if self.should_expose_component_version(self.command_name):
self.save_component_version_to_structured_out(self.command_name)
+ def get_version(self, env):
+ pass
+
def execute_prefix_function(self, command_name, afix, env):
"""
Execute action afix (prefix or suffix) based on command_name and afix type
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java
index 817a238..1ec7028 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,37 +17,25 @@
*/
package org.apache.ambari.server.agent;
-import java.util.List;
+import org.apache.ambari.server.AmbariException;
-import org.apache.ambari.server.agent.stomp.dto.HostStatusReport;
+public abstract class AgentReport<R> {
-public class AgentReport {
+ private final String hostName;
+ private final R report;
- private String hostName;
- private List<ComponentStatus> componentStatuses;
- private List<CommandReport> reports;
- private HostStatusReport hostStatusReport;
-
- public AgentReport(String hostName, List<ComponentStatus> componentStatuses, List<CommandReport> reports, HostStatusReport hostStatusReport) {
+ public AgentReport(String hostName, R report) {
this.hostName = hostName;
- this.componentStatuses = componentStatuses;
- this.reports = reports;
- this.hostStatusReport = hostStatusReport;
+ this.report = report;
}
public String getHostName() {
return hostName;
}
- public List<ComponentStatus> getComponentStatuses() {
- return componentStatuses;
- }
-
- public List<CommandReport> getCommandReports() {
- return reports;
+ public final void process() throws AmbariException {
+ process(report, hostName);
}
- public HostStatusReport getHostStatusReport() {
- return hostStatusReport;
- }
+ protected abstract void process(R report, String hostName) throws AmbariException;
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java
index ad5c6aa..7a2ec3a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java
@@ -49,9 +49,6 @@ public class AgentReportsProcessor {
}
@Inject
- private HeartBeatHandler hh;
-
- @Inject
private UnitOfWork unitOfWork;
@Inject
@@ -77,17 +74,8 @@ public class AgentReportsProcessor {
public void run() {
try {
unitOfWork.begin();
- String hostName = agentReport.getHostName();
try {
-
- //TODO rewrite with polymorphism usage.
- if (agentReport.getCommandReports() != null) {
- hh.handleCommandReportStatus(agentReport.getCommandReports(), hostName);
- } else if (agentReport.getComponentStatuses() != null) {
- hh.handleComponentReportStatus(agentReport.getComponentStatuses(), hostName);
- } else if (agentReport.getHostStatusReport() != null) {
- hh.handleHostReportStatus(agentReport.getHostStatusReport(), hostName);
- }
+ agentReport.process();
} catch (AmbariException e) {
LOG.error("Error processing agent reports", e);
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandStatusAgentReport.java
similarity index 50%
copy from ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java
copy to ambari-server/src/main/java/org/apache/ambari/server/agent/CommandStatusAgentReport.java
index 817a238..04587ae 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandStatusAgentReport.java
@@ -19,35 +19,18 @@ package org.apache.ambari.server.agent;
import java.util.List;
-import org.apache.ambari.server.agent.stomp.dto.HostStatusReport;
+import org.apache.ambari.server.AmbariException;
-public class AgentReport {
+public class CommandStatusAgentReport extends AgentReport<List<CommandReport>> {
+ private final HeartBeatHandler hh;
- private String hostName;
- private List<ComponentStatus> componentStatuses;
- private List<CommandReport> reports;
- private HostStatusReport hostStatusReport;
-
- public AgentReport(String hostName, List<ComponentStatus> componentStatuses, List<CommandReport> reports, HostStatusReport hostStatusReport) {
- this.hostName = hostName;
- this.componentStatuses = componentStatuses;
- this.reports = reports;
- this.hostStatusReport = hostStatusReport;
- }
-
- public String getHostName() {
- return hostName;
- }
-
- public List<ComponentStatus> getComponentStatuses() {
- return componentStatuses;
- }
-
- public List<CommandReport> getCommandReports() {
- return reports;
+ public CommandStatusAgentReport(HeartBeatHandler hh, String hostName, List<CommandReport> commandReports) {
+ super(hostName, commandReports);
+ this.hh = hh;
}
- public HostStatusReport getHostStatusReport() {
- return hostStatusReport;
+ @Override
+ protected void process(List<CommandReport> report, String hostName) throws AmbariException {
+ hh.handleCommandReportStatus(report, hostName);
}
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatusAgentReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatusAgentReport.java
new file mode 100644
index 0000000..e6a3813
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatusAgentReport.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+import java.util.List;
+
+import org.apache.ambari.server.AmbariException;
+
+public class ComponentStatusAgentReport extends AgentReport<List<ComponentStatus>> {
+ private final HeartBeatHandler hh;
+
+ public ComponentStatusAgentReport(HeartBeatHandler hh, String hostName, List<ComponentStatus> componentStatuses) {
+ super(hostName, componentStatuses);
+ this.hh = hh;
+ }
+
+ @Override
+ protected void process(List<ComponentStatus> report, String hostName) throws AmbariException {
+ hh.handleComponentReportStatus(report, hostName);
+ }
+}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentVersionAgentReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentVersionAgentReport.java
new file mode 100644
index 0000000..405d34a
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentVersionAgentReport.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.agent.stomp.dto.ComponentVersionReports;
+
+public class ComponentVersionAgentReport extends AgentReport<ComponentVersionReports> {
+ private final HeartBeatHandler hh;
+
+ public ComponentVersionAgentReport(HeartBeatHandler hh, String hostName,
+ ComponentVersionReports componentVersionReports) {
+ super(hostName, componentVersionReports);
+ this.hh = hh;
+ }
+
+ @Override
+ protected void process(ComponentVersionReports report, String hostName) throws AmbariException {
+ hh.handleComponentVersionReports(report, hostName);
+ }
+}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index 7d70390..1c225a9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -26,6 +26,7 @@ import java.util.regex.Pattern;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.HostNotFoundException;
import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.agent.stomp.dto.ComponentVersionReports;
import org.apache.ambari.server.agent.stomp.dto.HostStatusReport;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.configuration.Configuration;
@@ -256,6 +257,10 @@ public class HeartBeatHandler {
}
}
+ public void handleComponentVersionReports(ComponentVersionReports componentVersionReports, String hostname) throws AmbariException {
+ heartbeatProcessor.processVersionReports(componentVersionReports, hostname);
+ }
+
protected void processRecoveryReport(RecoveryReport recoveryReport, String hostname) throws AmbariException {
LOG.debug("Received recovery report: {}", recoveryReport);
Host host = clusterFsm.getHost(hostname);
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
index ffbf3f3..e6b1937 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
@@ -41,6 +41,8 @@ import org.apache.ambari.server.actionmanager.ActionManager;
import org.apache.ambari.server.actionmanager.HostRoleCommand;
import org.apache.ambari.server.actionmanager.HostRoleStatus;
import org.apache.ambari.server.agent.ExecutionCommand.KeyNames;
+import org.apache.ambari.server.agent.stomp.dto.ComponentVersionReport;
+import org.apache.ambari.server.agent.stomp.dto.ComponentVersionReports;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.controller.MaintenanceStateHelper;
import org.apache.ambari.server.events.ActionFinalReportReceivedEvent;
@@ -550,6 +552,70 @@ public class HeartbeatProcessor extends AbstractService{
}
/**
+ * Process reports of components versions
+ * @throws AmbariException
+ */
+ public void processVersionReports(ComponentVersionReports versionReports, String hostname) throws AmbariException {
+ Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
+ for (Cluster cl : clusters) {
+ for (Map.Entry<String, List<ComponentVersionReport>> status : versionReports
+ .getComponentVersionReports().entrySet()) {
+ if (Long.valueOf(status.getKey()).equals(cl.getClusterId())) {
+ for (ComponentVersionReport versionReport : status.getValue()) {
+ try {
+ Service svc = cl.getService(versionReport.getServiceName());
+
+ String componentName = versionReport.getComponentName();
+ if (svc.getServiceComponents().containsKey(componentName)) {
+ ServiceComponent svcComp = svc.getServiceComponent(
+ componentName);
+ ServiceComponentHost scHost = svcComp.getServiceComponentHost(
+ hostname);
+
+ String version = versionReport.getVersion();
+
+ HostComponentVersionAdvertisedEvent event = new HostComponentVersionAdvertisedEvent(cl,
+ scHost, version);
+ versionEventPublisher.publish(event);
+ }
+ } catch (ServiceNotFoundException e) {
+ LOG.warn("Received a version report for a non-initialized"
+ + " service"
+ + ", clusterId=" + versionReport.getClusterId()
+ + ", serviceName=" + versionReport.getServiceName());
+ continue;
+ } catch (ServiceComponentNotFoundException e) {
+ LOG.warn("Received a version report for a non-initialized"
+ + " servicecomponent"
+ + ", clusterId=" + versionReport.getClusterId()
+ + ", serviceName=" + versionReport.getServiceName()
+ + ", componentName=" + versionReport.getComponentName());
+ continue;
+ } catch (ServiceComponentHostNotFoundException e) {
+ LOG.warn("Received a version report for a non-initialized"
+ + " hostcomponent"
+ + ", clusterId=" + versionReport.getClusterId()
+ + ", serviceName=" + versionReport.getServiceName()
+ + ", componentName=" + versionReport.getComponentName()
+ + ", hostname=" + hostname);
+ continue;
+ } catch (RuntimeException e) {
+ LOG.warn("Received a version report with invalid payload"
+ + " service"
+ + ", clusterId=" + versionReport.getClusterId()
+ + ", serviceName=" + versionReport.getServiceName()
+ + ", componentName=" + versionReport.getComponentName()
+ + ", hostname=" + hostname
+ + ", error=" + e.getMessage());
+ continue;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
* Process reports of status commands
* @throws AmbariException
*/
@@ -596,13 +662,6 @@ public class HeartbeatProcessor extends AbstractService{
List<Map<String, String>> list = (List<Map<String, String>>) extra.get("processes");
scHost.setProcesses(list);
}
- if (extra.containsKey("version")) {
- String version = extra.get("version").toString();
-
- HostComponentVersionAdvertisedEvent event = new HostComponentVersionAdvertisedEvent(cl, scHost, version);
- versionEventPublisher.publish(event);
- }
-
} catch (Exception e) {
LOG.error("Could not access extra JSON for " +
scHost.getServiceComponentName() + " from " +
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HostStatusAgentReport.java
similarity index 53%
copy from ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java
copy to ambari-server/src/main/java/org/apache/ambari/server/agent/HostStatusAgentReport.java
index 817a238..a85f961 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HostStatusAgentReport.java
@@ -17,37 +17,19 @@
*/
package org.apache.ambari.server.agent;
-import java.util.List;
-
+import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.agent.stomp.dto.HostStatusReport;
-public class AgentReport {
-
- private String hostName;
- private List<ComponentStatus> componentStatuses;
- private List<CommandReport> reports;
- private HostStatusReport hostStatusReport;
-
- public AgentReport(String hostName, List<ComponentStatus> componentStatuses, List<CommandReport> reports, HostStatusReport hostStatusReport) {
- this.hostName = hostName;
- this.componentStatuses = componentStatuses;
- this.reports = reports;
- this.hostStatusReport = hostStatusReport;
- }
-
- public String getHostName() {
- return hostName;
- }
-
- public List<ComponentStatus> getComponentStatuses() {
- return componentStatuses;
- }
+public class HostStatusAgentReport extends AgentReport<HostStatusReport> {
+ private final HeartBeatHandler hh;
- public List<CommandReport> getCommandReports() {
- return reports;
+ public HostStatusAgentReport(HeartBeatHandler hh, String hostName, HostStatusReport hostStatusReport) {
+ super(hostName, hostStatusReport);
+ this.hh = hh;
}
- public HostStatusReport getHostStatusReport() {
- return hostStatusReport;
+ @Override
+ protected void process(HostStatusReport report, String hostName) throws AmbariException {
+ hh.handleHostReportStatus(report, hostName);
}
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
index 82ddb1c..022f7a0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
@@ -27,16 +27,20 @@ import javax.ws.rs.WebApplicationException;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.HostNotRegisteredException;
-import org.apache.ambari.server.agent.AgentReport;
import org.apache.ambari.server.agent.AgentReportsProcessor;
import org.apache.ambari.server.agent.AgentSessionManager;
import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.agent.CommandStatusAgentReport;
import org.apache.ambari.server.agent.ComponentStatus;
+import org.apache.ambari.server.agent.ComponentStatusAgentReport;
+import org.apache.ambari.server.agent.ComponentVersionAgentReport;
import org.apache.ambari.server.agent.HeartBeatHandler;
+import org.apache.ambari.server.agent.HostStatusAgentReport;
import org.apache.ambari.server.agent.stomp.dto.AckReport;
import org.apache.ambari.server.agent.stomp.dto.CommandStatusReports;
import org.apache.ambari.server.agent.stomp.dto.ComponentStatusReport;
import org.apache.ambari.server.agent.stomp.dto.ComponentStatusReports;
+import org.apache.ambari.server.agent.stomp.dto.ComponentVersionReports;
import org.apache.ambari.server.agent.stomp.dto.HostStatusReport;
import org.apache.ambari.server.events.DefaultMessageEmitter;
import org.apache.ambari.server.state.Alert;
@@ -70,6 +74,15 @@ public class AgentReportsController {
agentReportsProcessor = injector.getInstance(AgentReportsProcessor.class);
}
+ @MessageMapping("/component_version")
+ public ReportsResponse handleComponentVersionReport(@Header String simpSessionId, ComponentVersionReports message)
+ throws WebApplicationException, InvalidStateTransitionException, AmbariException {
+
+ agentReportsProcessor.addAgentReport(new ComponentVersionAgentReport(hh,
+ agentSessionManager.getHost(simpSessionId).getHostName(), message));
+ return new ReportsResponse();
+ }
+
@MessageMapping("/component_status")
public ReportsResponse handleComponentReportStatus(@Header String simpSessionId, ComponentStatusReports message)
throws WebApplicationException, InvalidStateTransitionException, AmbariException {
@@ -85,8 +98,8 @@ public class AgentReportsController {
}
}
- agentReportsProcessor.addAgentReport(new AgentReport(agentSessionManager.getHost(simpSessionId).getHostName(),
- statuses, null, null));
+ agentReportsProcessor.addAgentReport(new ComponentStatusAgentReport(hh,
+ agentSessionManager.getHost(simpSessionId).getHostName(), statuses));
return new ReportsResponse();
}
@@ -98,15 +111,15 @@ public class AgentReportsController {
statuses.addAll(clusterReport.getValue());
}
- agentReportsProcessor.addAgentReport(new AgentReport(agentSessionManager.getHost(simpSessionId).getHostName(),
- null, statuses, null));
+ agentReportsProcessor.addAgentReport(new CommandStatusAgentReport(hh,
+ agentSessionManager.getHost(simpSessionId).getHostName(), statuses));
return new ReportsResponse();
}
@MessageMapping("/host_status")
public ReportsResponse handleHostReportStatus(@Header String simpSessionId, HostStatusReport message) throws AmbariException {
- agentReportsProcessor.addAgentReport(new AgentReport(agentSessionManager.getHost(simpSessionId).getHostName(),
- null, null, message));
+ agentReportsProcessor.addAgentReport(new HostStatusAgentReport(hh,
+ agentSessionManager.getHost(simpSessionId).getHostName(), message));
return new ReportsResponse();
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentVersionReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentVersionReport.java
new file mode 100644
index 0000000..6619096
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentVersionReport.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.agent.stomp.dto;
+
+public class ComponentVersionReport {
+ private String componentName;
+ private String serviceName;
+ private String version;
+ private Long clusterId;
+
+ public ComponentVersionReport() {
+ }
+
+ public ComponentVersionReport(String componentName, String serviceName, String version, Long clusterId) {
+ this.componentName = componentName;
+ this.serviceName = serviceName;
+ this.version = version;
+ this.clusterId = clusterId;
+ }
+
+ public String getComponentName() {
+ return componentName;
+ }
+
+ public void setComponentName(String componentName) {
+ this.componentName = componentName;
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public void setServiceName(String serviceName) {
+ this.serviceName = serviceName;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
+
+ public Long getClusterId() {
+ return clusterId;
+ }
+
+ public void setClusterId(Long clusterId) {
+ this.clusterId = clusterId;
+ }
+}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentVersionReports.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentVersionReports.java
new file mode 100644
index 0000000..21e4210
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentVersionReports.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.agent.stomp.dto;
+
+import java.util.List;
+import java.util.TreeMap;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class ComponentVersionReports {
+
+ @JsonProperty("clusters")
+ private TreeMap<String, List<ComponentVersionReport>> componentVersionReports;
+
+ public ComponentVersionReports() {
+ }
+
+ public ComponentVersionReports(TreeMap<String, List<ComponentVersionReport>> componentVersionReports) {
+ this.componentVersionReports = componentVersionReports;
+ }
+
+ public TreeMap<String, List<ComponentVersionReport>> getComponentVersionReports() {
+ return componentVersionReports;
+ }
+
+ public void setComponentVersionReports(TreeMap<String, List<ComponentVersionReport>> componentVersionReports) {
+ this.componentVersionReports = componentVersionReports;
+ }
+}