You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2018/08/08 11:05:22 UTC

[ambari] branch trunk updated: AMBARI-24416. Component Versions Are Not Reported On Initial Status Commands Anymore (aonishuk)

This is an automated email from the ASF dual-hosted git repository.

aonishuk pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 50a8350  AMBARI-24416. Component Versions Are Not Reported On Initial Status Commands Anymore (aonishuk)
50a8350 is described below

commit 50a83500dca8494ae35751c4c94970b8ac5c249e
Author: Andrew Onishuk <ao...@hortonworks.com>
AuthorDate: Tue Aug 7 22:27:39 2018 +0300

    AMBARI-24416. Component Versions Are Not Reported On Initial Status Commands Anymore (aonishuk)
---
 .../ambari_agent/ComponentVersionReporter.py       | 109 +++++++++++++++++++++
 .../src/main/python/ambari_agent/Constants.py      |   1 +
 .../ambari_agent/CustomServiceOrchestrator.py      |  21 ++--
 .../main/python/ambari_agent/HeartbeatThread.py    |   7 ++
 .../main/python/ambari_agent/models/commands.py    |   1 +
 .../resource_management/libraries/script/script.py |   9 +-
 .../apache/ambari/server/agent/AgentReport.java    |  36 +++----
 .../ambari/server/agent/AgentReportsProcessor.java |  14 +--
 ...ntReport.java => CommandStatusAgentReport.java} |  35 ++-----
 .../server/agent/ComponentStatusAgentReport.java   |  36 +++++++
 .../server/agent/ComponentVersionAgentReport.java  |  36 +++++++
 .../ambari/server/agent/HeartBeatHandler.java      |   5 +
 .../ambari/server/agent/HeartbeatProcessor.java    |  73 ++++++++++++--
 ...AgentReport.java => HostStatusAgentReport.java} |  36 ++-----
 .../server/agent/stomp/AgentReportsController.java |  27 +++--
 .../agent/stomp/dto/ComponentVersionReport.java    |  68 +++++++++++++
 .../agent/stomp/dto/ComponentVersionReports.java   |  45 +++++++++
 17 files changed, 443 insertions(+), 116 deletions(-)

diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentVersionReporter.py b/ambari-agent/src/main/python/ambari_agent/ComponentVersionReporter.py
new file mode 100644
index 0000000..eb4358b
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentVersionReporter.py
@@ -0,0 +1,109 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import logging
+import threading
+
+from ambari_agent import Constants
+from collections import defaultdict
+
+from ambari_agent.models.commands import AgentCommand
+
+logger = logging.getLogger(__name__)
+
+class ComponentVersionReporter(threading.Thread):
+  def __init__(self, initializer_module):
+    self.initializer_module = initializer_module
+    self.topology_cache = initializer_module.topology_cache
+    self.customServiceOrchestrator = initializer_module.customServiceOrchestrator
+    self.server_responses_listener = initializer_module.server_responses_listener
+    threading.Thread.__init__(self)
+
+  def run(self):
+    """
+    Get version of all components by running get_version execution command.
+    """
+    try:
+      cluster_reports = defaultdict(lambda:[])
+
+      for cluster_id in self.topology_cache.get_cluster_ids():
+        topology_cache = self.topology_cache[cluster_id]
+
+        if 'components' not in topology_cache:
+          continue
+
+        current_host_id = self.topology_cache.get_current_host_id(cluster_id)
+
+        if current_host_id is None:
+          continue
+
+        cluster_components = topology_cache.components
+        for component_dict in cluster_components:
+          # check if component is installed on current host
+          if current_host_id not in component_dict.hostIds:
+            continue
+
+          service_name = component_dict.serviceName
+          component_name = component_dict.componentName
+
+          result = self.check_component_version(cluster_id, service_name, component_name)
+
+          if result:
+            cluster_reports[cluster_id].append(result)
+
+      self.send_updates_to_server(cluster_reports)
+    except:
+      logger.exception("Exception in ComponentVersionReporter")
+
+  def check_component_version(self, cluster_id, service_name, component_name):
+    """
+    Returns components version
+    """
+    # if not a component
+    if self.topology_cache.get_component_info_by_key(cluster_id, service_name, component_name) is None:
+      return None
+
+    command_dict = {
+      'serviceName': service_name,
+      'role': component_name,
+      'clusterId': cluster_id,
+      'commandType': AgentCommand.get_version,
+    }
+
+    version_result = self.customServiceOrchestrator.requestComponentStatus(command_dict, command_name=AgentCommand.get_version)
+
+    if version_result['exitcode'] or not 'structuredOut' in version_result or not 'version' in version_result['structuredOut']:
+      logger.error("Could not get version for component {0} of {1} service cluster_id={2}. Command returned: {3}".format(component_name, service_name, cluster_id, version_result))
+      return None
+
+    # TODO: check if no strout or version if not there
+
+    result = {
+      'serviceName': service_name,
+      'componentName': component_name,
+      'version': version_result['structuredOut']['version'],
+      'clusterId': cluster_id,
+    }
+
+    return result
+
+  def send_updates_to_server(self, cluster_reports):
+    if not cluster_reports or not self.initializer_module.is_registered:
+      return
+
+    self.initializer_module.connection.send(message={'clusters': cluster_reports}, destination=Constants.COMPONENT_VERSION_REPORTS_ENDPOINT)
diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py
index ed6b482..91141a6 100644
--- a/ambari-agent/src/main/python/ambari_agent/Constants.py
+++ b/ambari-agent/src/main/python/ambari_agent/Constants.py
@@ -38,6 +38,7 @@ CONFIGURATIONS_REQUEST_ENDPOINT = '/agents/configs'
 HOST_LEVEL_PARAMS_TOPIC_ENPOINT = '/agents/host_level_params'
 ALERTS_DEFINITIONS_REQUEST_ENDPOINT = '/agents/alert_definitions'
 COMPONENT_STATUS_REPORTS_ENDPOINT = '/reports/component_status'
+COMPONENT_VERSION_REPORTS_ENDPOINT = '/reports/component_version'
 COMMANDS_STATUS_REPORTS_ENDPOINT = '/reports/commands_status'
 HOST_STATUS_REPORTS_ENDPOINT = '/reports/host_status'
 ALERTS_STATUS_REPORTS_ENDPOINT = '/reports/alerts_status'
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index c41118f..5826f47 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -95,6 +95,8 @@ class CustomServiceOrchestrator(object):
                                                'status_command_stdout_{0}.txt')
     self.status_commands_stderr = os.path.join(self.tmp_dir,
                                                'status_command_stderr_{0}.txt')
+    self.status_structured_out = os.path.join(self.tmp_dir,
+                                               'status_structured-out-{0}.json')
 
     # Construct the hadoop credential lib JARs path
     self.credential_shell_lib_path = os.path.join(self.config.get('security', 'credential_lib_dir',
@@ -309,7 +311,7 @@ class CustomServiceOrchestrator(object):
     return cmd_result
 
   def runCommand(self, command_header, tmpoutfile, tmperrfile, forced_command_name=None,
-                 override_output_files=True, retry=False, is_status_command=False):
+                 override_output_files=True, retry=False, is_status_command=False, tmpstrucoutfile=None):
     """
     forced_command_name may be specified manually. In this case, value, defined at
     command json, is ignored.
@@ -350,7 +352,8 @@ class CustomServiceOrchestrator(object):
         script_path = self.resolve_script_path(base_dir, script)
         script_tuple = (script_path, base_dir)
 
-      tmpstrucoutfile = os.path.join(self.tmp_dir, "structured-out-{0}.json".format(task_id))
+      if not tmpstrucoutfile:
+        tmpstrucoutfile = os.path.join(self.tmp_dir, "structured-out-{0}.json".format(task_id))
 
       # We don't support anything else yet
       if script_type.upper() != self.SCRIPT_TYPE_PYTHON:
@@ -376,7 +379,7 @@ class CustomServiceOrchestrator(object):
         else:
           logger.info("Skipping generation of jceks files as this is a retry of the command")
 
-      json_path = self.dump_command_to_json(command, retry)
+      json_path = self.dump_command_to_json(command, retry, is_status_command)
       hooks = self.hooks_orchestrator.resolve_hooks(command, command_name)
       """:type hooks ambari_agent.CommandHooksOrchestrator.ResolvedHooks"""
 
@@ -510,7 +513,7 @@ class CustomServiceOrchestrator(object):
 
     return command
 
-  def requestComponentStatus(self, command_header):
+  def requestComponentStatus(self, command_header, command_name="STATUS"):
     """
      Component status is determined by exit code, returned by runCommand().
      Exit code 0 means that component is running and any other exit code means that
@@ -523,11 +526,13 @@ class CustomServiceOrchestrator(object):
     # make sure status commands that run in parallel don't use the same files
     status_commands_stdout = self.status_commands_stdout.format(uuid.uuid4())
     status_commands_stderr = self.status_commands_stderr.format(uuid.uuid4())
+    status_structured_out = self.status_structured_out.format(uuid.uuid4())
 
     try:
       res = self.runCommand(command_header, status_commands_stdout,
-                            status_commands_stderr, self.COMMAND_NAME_STATUS,
-                            override_output_files=override_output_files, is_status_command=True)
+                            status_commands_stderr, command_name,
+                            override_output_files=override_output_files, is_status_command=True,
+                            tmpstrucoutfile=status_structured_out)
     finally:
       try:
         os.unlink(status_commands_stdout)
@@ -547,14 +552,14 @@ class CustomServiceOrchestrator(object):
       raise AgentException(message)
     return path
 
-  def dump_command_to_json(self, command, retry=False):
+  def dump_command_to_json(self, command, retry=False, is_status_command=False):
     """
     Converts command to json file and returns file path
     """
     # Now, dump the json file
     command_type = command['commandType']
 
-    if command_type == AgentCommand.status:
+    if is_status_command:
       # make sure status commands that run in parallel don't use the same files
       file_path = os.path.join(self.tmp_dir, "status_command_{0}.json".format(uuid.uuid4()))
     else:
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index 3403bb2..2d4e06b 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -27,6 +27,7 @@ from ambari_agent import Constants
 from ambari_agent.Register import Register
 from ambari_agent.Utils import BlockingDictionary
 from ambari_agent.Utils import Utils
+from ambari_agent.ComponentVersionReporter import ComponentVersionReporter
 from ambari_agent.listeners.ServerResponsesListener import ServerResponsesListener
 from ambari_agent.listeners.TopologyEventListener import TopologyEventListener
 from ambari_agent.listeners.ConfigurationEventListener import ConfigurationEventListener
@@ -146,14 +147,20 @@ class HeartbeatThread(threading.Thread):
     self.subscribe_to_topics(Constants.POST_REGISTRATION_TOPICS_TO_SUBSCRIBE)
 
     self.run_post_registration_actions()
+
     self.initializer_module.is_registered = True
     # now when registration is done we can expose connection to other threads.
     self.initializer_module._connection = self.connection
 
+    self.report_components_initial_versions()
+
   def run_post_registration_actions(self):
     for post_registration_action in self.post_registration_actions:
       post_registration_action()
 
+  def report_components_initial_versions(self):
+    ComponentVersionReporter(self.initializer_module).start()
+
   def unregister(self):
     """
     Disconnect and remove connection object from initializer_module so other threads cannot use it
diff --git a/ambari-agent/src/main/python/ambari_agent/models/commands.py b/ambari-agent/src/main/python/ambari_agent/models/commands.py
index eb96e9a..e9ea2c8 100644
--- a/ambari-agent/src/main/python/ambari_agent/models/commands.py
+++ b/ambari-agent/src/main/python/ambari_agent/models/commands.py
@@ -19,6 +19,7 @@ limitations under the License.
 
 class AgentCommand(object):
   status = "STATUS_COMMAND"
+  get_version = "GET_VERSION"
   execution = "EXECUTION_COMMAND"
   auto_execution = "AUTO_EXECUTION_COMMAND"
   background_execution = "BACKGROUND_EXECUTION_COMMAND"
diff --git a/ambari-common/src/main/python/resource_management/libraries/script/script.py b/ambari-common/src/main/python/resource_management/libraries/script/script.py
index 10276ec..022dd82 100644
--- a/ambari-common/src/main/python/resource_management/libraries/script/script.py
+++ b/ambari-common/src/main/python/resource_management/libraries/script/script.py
@@ -260,10 +260,8 @@ class Script(object):
     stack_version_unformatted = str(default("/clusterLevelParams/stack_version", ""))
     stack_version_formatted = format_stack_version(stack_version_unformatted)
     if stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted):
-      if command_name.lower() == "status":
-        request_version = default("/commandParams/request_version", None)
-        if request_version is not None:
-          return True
+      if command_name.lower() == "get_version":
+        return True
       else:
         # Populate version only on base commands
         return command_name.lower() == "start" or command_name.lower() == "install" or command_name.lower() == "restart"
@@ -362,6 +360,9 @@ class Script(object):
       if self.should_expose_component_version(self.command_name):
         self.save_component_version_to_structured_out(self.command_name)
 
+  def get_version(self, env):
+    pass
+
   def execute_prefix_function(self, command_name, afix, env):
     """
     Execute action afix (prefix or suffix) based on command_name and afix type
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java
index 817a238..1ec7028 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,37 +17,25 @@
  */
 package org.apache.ambari.server.agent;
 
-import java.util.List;
+import org.apache.ambari.server.AmbariException;
 
-import org.apache.ambari.server.agent.stomp.dto.HostStatusReport;
+public abstract class AgentReport<R> {
 
-public class AgentReport {
+  private final String hostName;
+  private final R report;
 
-  private String hostName;
-  private List<ComponentStatus> componentStatuses;
-  private List<CommandReport> reports;
-  private HostStatusReport hostStatusReport;
-
-  public AgentReport(String hostName, List<ComponentStatus> componentStatuses, List<CommandReport> reports, HostStatusReport hostStatusReport) {
+  public AgentReport(String hostName, R report) {
     this.hostName = hostName;
-    this.componentStatuses = componentStatuses;
-    this.reports = reports;
-    this.hostStatusReport = hostStatusReport;
+    this.report = report;
   }
 
   public String getHostName() {
     return hostName;
   }
 
-  public List<ComponentStatus> getComponentStatuses() {
-    return componentStatuses;
-  }
-
-  public List<CommandReport> getCommandReports() {
-    return reports;
+  public final void process() throws AmbariException {
+    process(report, hostName);
   }
 
-  public HostStatusReport getHostStatusReport() {
-    return hostStatusReport;
-  }
+  protected abstract void process(R report, String hostName) throws AmbariException;
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java
index ad5c6aa..7a2ec3a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java
@@ -49,9 +49,6 @@ public class AgentReportsProcessor {
   }
 
   @Inject
-  private HeartBeatHandler hh;
-
-  @Inject
   private UnitOfWork unitOfWork;
 
   @Inject
@@ -77,17 +74,8 @@ public class AgentReportsProcessor {
     public void run() {
       try {
         unitOfWork.begin();
-        String hostName = agentReport.getHostName();
         try {
-
-          //TODO rewrite with polymorphism usage.
-          if (agentReport.getCommandReports() != null) {
-            hh.handleCommandReportStatus(agentReport.getCommandReports(), hostName);
-          } else if (agentReport.getComponentStatuses() != null) {
-            hh.handleComponentReportStatus(agentReport.getComponentStatuses(), hostName);
-          } else if (agentReport.getHostStatusReport() != null) {
-            hh.handleHostReportStatus(agentReport.getHostStatusReport(), hostName);
-          }
+          agentReport.process();
         } catch (AmbariException e) {
           LOG.error("Error processing agent reports", e);
         }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandStatusAgentReport.java
similarity index 50%
copy from ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java
copy to ambari-server/src/main/java/org/apache/ambari/server/agent/CommandStatusAgentReport.java
index 817a238..04587ae 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandStatusAgentReport.java
@@ -19,35 +19,18 @@ package org.apache.ambari.server.agent;
 
 import java.util.List;
 
-import org.apache.ambari.server.agent.stomp.dto.HostStatusReport;
+import org.apache.ambari.server.AmbariException;
 
-public class AgentReport {
+public class CommandStatusAgentReport extends AgentReport<List<CommandReport>> {
+  private final HeartBeatHandler hh;
 
-  private String hostName;
-  private List<ComponentStatus> componentStatuses;
-  private List<CommandReport> reports;
-  private HostStatusReport hostStatusReport;
-
-  public AgentReport(String hostName, List<ComponentStatus> componentStatuses, List<CommandReport> reports, HostStatusReport hostStatusReport) {
-    this.hostName = hostName;
-    this.componentStatuses = componentStatuses;
-    this.reports = reports;
-    this.hostStatusReport = hostStatusReport;
-  }
-
-  public String getHostName() {
-    return hostName;
-  }
-
-  public List<ComponentStatus> getComponentStatuses() {
-    return componentStatuses;
-  }
-
-  public List<CommandReport> getCommandReports() {
-    return reports;
+  public CommandStatusAgentReport(HeartBeatHandler hh, String hostName, List<CommandReport> commandReports) {
+    super(hostName, commandReports);
+    this.hh = hh;
   }
 
-  public HostStatusReport getHostStatusReport() {
-    return hostStatusReport;
+  @Override
+  protected void process(List<CommandReport> report, String hostName) throws AmbariException {
+    hh.handleCommandReportStatus(report, hostName);
   }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatusAgentReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatusAgentReport.java
new file mode 100644
index 0000000..e6a3813
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatusAgentReport.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+import java.util.List;
+
+import org.apache.ambari.server.AmbariException;
+
+public class ComponentStatusAgentReport extends AgentReport<List<ComponentStatus>> {
+  private final HeartBeatHandler hh;
+
+  public ComponentStatusAgentReport(HeartBeatHandler hh, String hostName, List<ComponentStatus> componentStatuses) {
+    super(hostName, componentStatuses);
+    this.hh = hh;
+  }
+
+  @Override
+  protected void process(List<ComponentStatus> report, String hostName) throws AmbariException {
+    hh.handleComponentReportStatus(report, hostName);
+  }
+}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentVersionAgentReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentVersionAgentReport.java
new file mode 100644
index 0000000..405d34a
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentVersionAgentReport.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.agent.stomp.dto.ComponentVersionReports;
+
+public class ComponentVersionAgentReport extends AgentReport<ComponentVersionReports> {
+  private final HeartBeatHandler hh;
+
+  public ComponentVersionAgentReport(HeartBeatHandler hh, String hostName,
+                                     ComponentVersionReports componentVersionReports) {
+    super(hostName, componentVersionReports);
+    this.hh = hh;
+  }
+
+  @Override
+  protected void process(ComponentVersionReports report, String hostName) throws AmbariException {
+    hh.handleComponentVersionReports(report, hostName);
+  }
+}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index 7d70390..1c225a9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -26,6 +26,7 @@ import java.util.regex.Pattern;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.HostNotFoundException;
 import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.agent.stomp.dto.ComponentVersionReports;
 import org.apache.ambari.server.agent.stomp.dto.HostStatusReport;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.Configuration;
@@ -256,6 +257,10 @@ public class HeartBeatHandler {
     }
   }
 
+  public void handleComponentVersionReports(ComponentVersionReports componentVersionReports, String hostname) throws AmbariException {
+    heartbeatProcessor.processVersionReports(componentVersionReports, hostname);
+  }
+
   protected void processRecoveryReport(RecoveryReport recoveryReport, String hostname) throws AmbariException {
     LOG.debug("Received recovery report: {}", recoveryReport);
     Host host = clusterFsm.getHost(hostname);
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
index ffbf3f3..e6b1937 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
@@ -41,6 +41,8 @@ import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.agent.ExecutionCommand.KeyNames;
+import org.apache.ambari.server.agent.stomp.dto.ComponentVersionReport;
+import org.apache.ambari.server.agent.stomp.dto.ComponentVersionReports;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.controller.MaintenanceStateHelper;
 import org.apache.ambari.server.events.ActionFinalReportReceivedEvent;
@@ -550,6 +552,70 @@ public class HeartbeatProcessor extends AbstractService{
   }
 
   /**
+   * Process reports of components versions
+   * @throws AmbariException
+   */
+  public void processVersionReports(ComponentVersionReports versionReports, String hostname) throws AmbariException {
+    Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
+    for (Cluster cl : clusters) {
+      for (Map.Entry<String, List<ComponentVersionReport>> status : versionReports
+          .getComponentVersionReports().entrySet()) {
+        if (Long.valueOf(status.getKey()).equals(cl.getClusterId())) {
+          for (ComponentVersionReport versionReport : status.getValue()) {
+            try {
+              Service svc = cl.getService(versionReport.getServiceName());
+
+              String componentName = versionReport.getComponentName();
+              if (svc.getServiceComponents().containsKey(componentName)) {
+                ServiceComponent svcComp = svc.getServiceComponent(
+                    componentName);
+                ServiceComponentHost scHost = svcComp.getServiceComponentHost(
+                    hostname);
+
+                String version = versionReport.getVersion();
+
+                HostComponentVersionAdvertisedEvent event = new HostComponentVersionAdvertisedEvent(cl,
+                    scHost, version);
+                versionEventPublisher.publish(event);
+              }
+            } catch (ServiceNotFoundException e) {
+              LOG.warn("Received a version report for a non-initialized"
+                  + " service"
+                  + ", clusterId=" + versionReport.getClusterId()
+                  + ", serviceName=" + versionReport.getServiceName());
+              continue;
+            } catch (ServiceComponentNotFoundException e) {
+              LOG.warn("Received a version report for a non-initialized"
+                  + " servicecomponent"
+                  + ", clusterId=" + versionReport.getClusterId()
+                  + ", serviceName=" + versionReport.getServiceName()
+                  + ", componentName=" + versionReport.getComponentName());
+              continue;
+            } catch (ServiceComponentHostNotFoundException e) {
+              LOG.warn("Received a version report for a non-initialized"
+                  + " hostcomponent"
+                  + ", clusterId=" + versionReport.getClusterId()
+                  + ", serviceName=" + versionReport.getServiceName()
+                  + ", componentName=" + versionReport.getComponentName()
+                  + ", hostname=" + hostname);
+              continue;
+            } catch (RuntimeException e) {
+              LOG.warn("Received a version report with invalid payload"
+                  + " service"
+                  + ", clusterId=" + versionReport.getClusterId()
+                  + ", serviceName=" + versionReport.getServiceName()
+                  + ", componentName=" + versionReport.getComponentName()
+                  + ", hostname=" + hostname
+                  + ", error=" + e.getMessage());
+              continue;
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
    * Process reports of status commands
    * @throws AmbariException
    */
@@ -596,13 +662,6 @@ public class HeartbeatProcessor extends AbstractService{
                     List<Map<String, String>> list = (List<Map<String, String>>) extra.get("processes");
                     scHost.setProcesses(list);
                   }
-                  if (extra.containsKey("version")) {
-                    String version = extra.get("version").toString();
-
-                    HostComponentVersionAdvertisedEvent event = new HostComponentVersionAdvertisedEvent(cl, scHost, version);
-                    versionEventPublisher.publish(event);
-                  }
-
                 } catch (Exception e) {
                   LOG.error("Could not access extra JSON for " +
                       scHost.getServiceComponentName() + " from " +
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HostStatusAgentReport.java
similarity index 53%
copy from ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java
copy to ambari-server/src/main/java/org/apache/ambari/server/agent/HostStatusAgentReport.java
index 817a238..a85f961 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HostStatusAgentReport.java
@@ -17,37 +17,19 @@
  */
 package org.apache.ambari.server.agent;
 
-import java.util.List;
-
+import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.stomp.dto.HostStatusReport;
 
-public class AgentReport {
-
-  private String hostName;
-  private List<ComponentStatus> componentStatuses;
-  private List<CommandReport> reports;
-  private HostStatusReport hostStatusReport;
-
-  public AgentReport(String hostName, List<ComponentStatus> componentStatuses, List<CommandReport> reports, HostStatusReport hostStatusReport) {
-    this.hostName = hostName;
-    this.componentStatuses = componentStatuses;
-    this.reports = reports;
-    this.hostStatusReport = hostStatusReport;
-  }
-
-  public String getHostName() {
-    return hostName;
-  }
-
-  public List<ComponentStatus> getComponentStatuses() {
-    return componentStatuses;
-  }
+public class HostStatusAgentReport extends AgentReport<HostStatusReport> {
+  private final HeartBeatHandler hh;
 
-  public List<CommandReport> getCommandReports() {
-    return reports;
+  public HostStatusAgentReport(HeartBeatHandler hh, String hostName, HostStatusReport hostStatusReport) {
+    super(hostName, hostStatusReport);
+    this.hh = hh;
   }
 
-  public HostStatusReport getHostStatusReport() {
-    return hostStatusReport;
+  @Override
+  protected void process(HostStatusReport report, String hostName) throws AmbariException {
+    hh.handleHostReportStatus(report, hostName);
   }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
index 82ddb1c..022f7a0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
@@ -27,16 +27,20 @@ import javax.ws.rs.WebApplicationException;
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.HostNotRegisteredException;
-import org.apache.ambari.server.agent.AgentReport;
 import org.apache.ambari.server.agent.AgentReportsProcessor;
 import org.apache.ambari.server.agent.AgentSessionManager;
 import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.agent.CommandStatusAgentReport;
 import org.apache.ambari.server.agent.ComponentStatus;
+import org.apache.ambari.server.agent.ComponentStatusAgentReport;
+import org.apache.ambari.server.agent.ComponentVersionAgentReport;
 import org.apache.ambari.server.agent.HeartBeatHandler;
+import org.apache.ambari.server.agent.HostStatusAgentReport;
 import org.apache.ambari.server.agent.stomp.dto.AckReport;
 import org.apache.ambari.server.agent.stomp.dto.CommandStatusReports;
 import org.apache.ambari.server.agent.stomp.dto.ComponentStatusReport;
 import org.apache.ambari.server.agent.stomp.dto.ComponentStatusReports;
+import org.apache.ambari.server.agent.stomp.dto.ComponentVersionReports;
 import org.apache.ambari.server.agent.stomp.dto.HostStatusReport;
 import org.apache.ambari.server.events.DefaultMessageEmitter;
 import org.apache.ambari.server.state.Alert;
@@ -70,6 +74,15 @@ public class AgentReportsController {
     agentReportsProcessor = injector.getInstance(AgentReportsProcessor.class);
   }
 
+  @MessageMapping("/component_version")
+  public ReportsResponse handleComponentVersionReport(@Header String simpSessionId, ComponentVersionReports message)
+      throws WebApplicationException, InvalidStateTransitionException, AmbariException {
+
+    agentReportsProcessor.addAgentReport(new ComponentVersionAgentReport(hh,
+        agentSessionManager.getHost(simpSessionId).getHostName(), message));
+    return new ReportsResponse();
+  }
+
   @MessageMapping("/component_status")
   public ReportsResponse handleComponentReportStatus(@Header String simpSessionId, ComponentStatusReports message)
       throws WebApplicationException, InvalidStateTransitionException, AmbariException {
@@ -85,8 +98,8 @@ public class AgentReportsController {
       }
     }
 
-    agentReportsProcessor.addAgentReport(new AgentReport(agentSessionManager.getHost(simpSessionId).getHostName(),
-        statuses, null, null));
+    agentReportsProcessor.addAgentReport(new ComponentStatusAgentReport(hh,
+        agentSessionManager.getHost(simpSessionId).getHostName(), statuses));
     return new ReportsResponse();
   }
 
@@ -98,15 +111,15 @@ public class AgentReportsController {
       statuses.addAll(clusterReport.getValue());
     }
 
-    agentReportsProcessor.addAgentReport(new AgentReport(agentSessionManager.getHost(simpSessionId).getHostName(),
-        null, statuses, null));
+    agentReportsProcessor.addAgentReport(new CommandStatusAgentReport(hh,
+        agentSessionManager.getHost(simpSessionId).getHostName(), statuses));
     return new ReportsResponse();
   }
 
   @MessageMapping("/host_status")
   public ReportsResponse handleHostReportStatus(@Header String simpSessionId, HostStatusReport message) throws AmbariException {
-    agentReportsProcessor.addAgentReport(new AgentReport(agentSessionManager.getHost(simpSessionId).getHostName(),
-        null, null, message));
+    agentReportsProcessor.addAgentReport(new HostStatusAgentReport(hh,
+        agentSessionManager.getHost(simpSessionId).getHostName(), message));
     return new ReportsResponse();
   }
 
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentVersionReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentVersionReport.java
new file mode 100644
index 0000000..6619096
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentVersionReport.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.agent.stomp.dto;
+
+public class ComponentVersionReport {
+  private String componentName;
+  private String serviceName;
+  private String version;
+  private Long clusterId;
+
+  public ComponentVersionReport() {
+  }
+
+  public ComponentVersionReport(String componentName, String serviceName, String version, Long clusterId) {
+    this.componentName = componentName;
+    this.serviceName = serviceName;
+    this.version = version;
+    this.clusterId = clusterId;
+  }
+
+  public String getComponentName() {
+    return componentName;
+  }
+
+  public void setComponentName(String componentName) {
+    this.componentName = componentName;
+  }
+
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  public void setServiceName(String serviceName) {
+    this.serviceName = serviceName;
+  }
+
+  public String getVersion() {
+    return version;
+  }
+
+  public void setVersion(String version) {
+    this.version = version;
+  }
+
+  public Long getClusterId() {
+    return clusterId;
+  }
+
+  public void setClusterId(Long clusterId) {
+    this.clusterId = clusterId;
+  }
+}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentVersionReports.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentVersionReports.java
new file mode 100644
index 0000000..21e4210
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentVersionReports.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.agent.stomp.dto;
+
+import java.util.List;
+import java.util.TreeMap;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class ComponentVersionReports {
+
+  @JsonProperty("clusters")
+  private TreeMap<String, List<ComponentVersionReport>> componentVersionReports;
+
+  public ComponentVersionReports() {
+  }
+
+  public ComponentVersionReports(TreeMap<String, List<ComponentVersionReport>> componentVersionReports) {
+    this.componentVersionReports = componentVersionReports;
+  }
+
+  public TreeMap<String, List<ComponentVersionReport>> getComponentVersionReports() {
+    return componentVersionReports;
+  }
+
+  public void setComponentVersionReports(TreeMap<String, List<ComponentVersionReport>> componentVersionReports) {
+    this.componentVersionReports = componentVersionReports;
+  }
+}