You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2016/02/23 20:04:59 UTC

[3/3] ambari git commit: AMBARI-15141. Start all services request aborts in the middle and hosts go into heartbeat-lost state. (mpapirkovskyy)

AMBARI-15141. Start all services request aborts in the middle and hosts go into heartbeat-lost state. (mpapirkovskyy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/083ac6da
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/083ac6da
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/083ac6da

Branch: refs/heads/trunk
Commit: 083ac6dab5cf59c01da054eb656507c089a54620
Parents: 9d7ff5f
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Tue Feb 23 13:01:13 2016 +0200
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Tue Feb 23 21:04:22 2016 +0200

----------------------------------------------------------------------
 .../ambari/server/agent/HeartBeatHandler.java   |  582 +------
 .../ambari/server/agent/HeartbeatMonitor.java   |    6 +
 .../ambari/server/agent/HeartbeatProcessor.java |  773 +++++++++
 .../ambari/server/orm/dao/HostVersionDAO.java   |   78 +-
 .../server/orm/entities/HostVersionEntity.java  |    9 +
 .../server/state/cluster/ClusterImpl.java       |    6 +-
 .../svccomphost/ServiceComponentHostImpl.java   |   72 +-
 .../server/agent/HeartbeatProcessorTest.java    | 1290 +++++++++++++++
 .../server/agent/HeartbeatTestHelper.java       |  229 +++
 .../server/agent/TestHeartbeatHandler.java      | 1489 ++----------------
 10 files changed, 2559 insertions(+), 1975 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index 248ce4b..ba14446 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -126,6 +126,7 @@ public class HeartBeatHandler {
   private final ActionQueue actionQueue;
   private final ActionManager actionManager;
   private HeartbeatMonitor heartbeatMonitor;
+  private HeartbeatProcessor heartbeatProcessor;
 
   @Inject
   private Injector injector;
@@ -137,38 +138,11 @@ public class HeartBeatHandler {
   private AmbariMetaInfo ambariMetaInfo;
 
   @Inject
-  private ActionMetadata actionMetadata;
-
-  @Inject
-  private Gson gson;
-
-  @Inject
   private ConfigHelper configHelper;
 
   @Inject
-  private HostDAO hostDAO;
-
-  @Inject
   private AlertDefinitionHash alertDefinitionHash;
 
-  /**
-   * Publishes {@link AlertEvent} instances.
-   */
-  @Inject
-  private AlertEventPublisher alertEventPublisher;
-
-  @Inject
-  private AmbariEventPublisher ambariEventPublisher;
-
-  @Inject
-  private VersionEventPublisher versionEventPublisher;
-
-
-  /**
-   * KerberosPrincipalHostDAO used to set and get Kerberos principal details
-   */
-  @Inject
-  private KerberosPrincipalHostDAO kerberosPrincipalHostDAO;
 
   /**
    * KerberosIdentityDataFileReaderFactory used to create KerberosIdentityDataFileReader instances
@@ -187,10 +161,12 @@ public class HeartBeatHandler {
     actionQueue = aq;
     actionManager = am;
     heartbeatMonitor = new HeartbeatMonitor(fsm, aq, am, 60000, injector);
+    heartbeatProcessor = new HeartbeatProcessor(fsm, am, heartbeatMonitor, injector); //TODO modify to match pattern
     injector.injectMembers(this);
   }
 
   public void start() {
+    heartbeatProcessor.startAsync();
     heartbeatMonitor.start();
   }
 
@@ -198,6 +174,14 @@ public class HeartBeatHandler {
     this.heartbeatMonitor = heartbeatMonitor;
   }
 
+  public void setHeartbeatProcessor(HeartbeatProcessor heartbeatProcessor) {
+    this.heartbeatProcessor = heartbeatProcessor;
+  }
+
+  public HeartbeatProcessor getHeartbeatProcessor() {
+    return heartbeatProcessor;
+  }
+
   public HeartBeatResponse handleHeartBeat(HeartBeat heartbeat)
       throws AmbariException {
     long now = System.currentTimeMillis();
@@ -283,18 +267,7 @@ public class HeartBeatHandler {
       return createRegisterCommand();
     }
 
-    // Examine heartbeat for command reports
-    processCommandReports(heartbeat, hostname, clusterFsm, now);
-
-    // Examine heartbeat for component live status reports
-    processStatusReports(heartbeat, hostname, clusterFsm);
-
-    // Calculate host status
-    // NOTE: This step must be after processing command/status reports
-    processHostStatus(heartbeat, hostname);
-
-    // Example heartbeat for alerts from the host or its components
-    processAlerts(heartbeat, hostname);
+    heartbeatProcessor.addHeartbeat(heartbeat);
 
     // Send commands if node is active
     if (hostObject.getState().equals(HostState.HEALTHY)) {
@@ -305,33 +278,7 @@ public class HeartBeatHandler {
     return response;
   }
 
-  /**
-   * Extracts all of the {@link Alert}s from the heartbeat and fires
-   * {@link AlertEvent}s for each one. If there is a problem looking up the
-   * cluster, then alerts will not be processed.
-   *
-   * @param heartbeat
-   *          the heartbeat to process.
-   * @param hostname
-   *          the host that the heartbeat is for.
-   */
-  protected void processAlerts(HeartBeat heartbeat, String hostname) {
-
-    if (null == hostname || null == heartbeat) {
-      return;
-    }
 
-    if (null != heartbeat.getAlerts()) {
-      AlertEvent event = new AlertReceivedEvent(heartbeat.getAlerts());
-      for (Alert alert : event.getAlerts()) {
-        if (alert.getHostName() == null) {
-          alert.setHostName(hostname);
-        }
-      }
-      alertEventPublisher.publish(event);
-
-    }
-  }
 
   protected void processRecoveryReport(RecoveryReport recoveryReport, String hostname) throws AmbariException {
     LOG.debug("Received recovery report: " + recoveryReport.toString());
@@ -339,480 +286,6 @@ public class HeartBeatHandler {
     host.setRecoveryReport(recoveryReport);
   }
 
-  protected void processHostStatus(HeartBeat heartbeat, String hostname) throws AmbariException {
-
-    Host host = clusterFsm.getHost(hostname);
-    HealthStatus healthStatus = host.getHealthStatus().getHealthStatus();
-
-    if (!healthStatus.equals(HostHealthStatus.HealthStatus.UNKNOWN)) {
-
-      List<ComponentStatus> componentStatuses = heartbeat.getComponentStatus();
-      //Host status info could be calculated only if agent returned statuses in heartbeat
-      //Or, if a command is executed that can change component status
-      boolean calculateHostStatus = false;
-      String clusterName = null;
-      if (componentStatuses.size() > 0) {
-        calculateHostStatus = true;
-        for (ComponentStatus componentStatus : componentStatuses) {
-          clusterName = componentStatus.getClusterName();
-          break;
-        }
-      }
-
-      if (!calculateHostStatus) {
-        List<CommandReport> reports = heartbeat.getReports();
-        for (CommandReport report : reports) {
-          if (RoleCommand.ACTIONEXECUTE.toString().equals(report.getRoleCommand())) {
-            continue;
-          }
-
-          String service = report.getServiceName();
-          if (actionMetadata.getActions(service.toLowerCase()).contains(report.getRole())) {
-            continue;
-          }
-          if (report.getStatus().equals("COMPLETED")) {
-            calculateHostStatus = true;
-            clusterName = report.getClusterName();
-            break;
-          }
-        }
-      }
-
-      if (calculateHostStatus) {
-        //Use actual component status to compute the host status
-        int masterCount = 0;
-        int mastersRunning = 0;
-        int slaveCount = 0;
-        int slavesRunning = 0;
-
-        StackId stackId;
-        Cluster cluster = clusterFsm.getCluster(clusterName);
-        stackId = cluster.getDesiredStackVersion();
-
-        MaintenanceStateHelper psh = injector.getInstance(MaintenanceStateHelper.class);
-
-        List<ServiceComponentHost> scHosts = cluster.getServiceComponentHosts(heartbeat.getHostname());
-        for (ServiceComponentHost scHost : scHosts) {
-          ComponentInfo componentInfo =
-              ambariMetaInfo.getComponent(stackId.getStackName(),
-                  stackId.getStackVersion(), scHost.getServiceName(),
-                  scHost.getServiceComponentName());
-
-          String status = scHost.getState().name();
-
-          String category = componentInfo.getCategory();
-
-          if (MaintenanceState.OFF == psh.getEffectiveState(scHost, host)) {
-            if (category.equals("MASTER")) {
-              ++masterCount;
-              if (status.equals("STARTED")) {
-                ++mastersRunning;
-              }
-            } else if (category.equals("SLAVE")) {
-              ++slaveCount;
-              if (status.equals("STARTED")) {
-                ++slavesRunning;
-              }
-            }
-          }
-        }
-
-        if (masterCount == mastersRunning && slaveCount == slavesRunning) {
-          healthStatus = HealthStatus.HEALTHY;
-        } else if (masterCount > 0 && mastersRunning < masterCount) {
-          healthStatus = HealthStatus.UNHEALTHY;
-        } else {
-          healthStatus = HealthStatus.ALERT;
-        }
-
-        host.setStatus(healthStatus.name());
-        host.persist();
-      }
-
-      //If host doesn't belong to any cluster
-      if ((clusterFsm.getClustersForHost(host.getHostName())).size() == 0) {
-        healthStatus = HealthStatus.HEALTHY;
-        host.setStatus(healthStatus.name());
-        host.persist();
-      }
-    }
-  }
-
-  protected void processCommandReports(
-      HeartBeat heartbeat, String hostname, Clusters clusterFsm, long now)
-      throws AmbariException {
-    List<CommandReport> reports = heartbeat.getReports();
-
-    // Cache HostRoleCommand entities because we will need them few times
-    List<Long> taskIds = new ArrayList<Long>();
-    for (CommandReport report : reports) {
-      taskIds.add(report.getTaskId());
-    }
-    Collection<HostRoleCommand> commands = actionManager.getTasks(taskIds);
-
-    Iterator<HostRoleCommand> hostRoleCommandIterator = commands.iterator();
-    for (CommandReport report : reports) {
-
-      Long clusterId = null;
-      if (report.getClusterName() != null) {
-        try {
-          Cluster cluster = clusterFsm.getCluster(report.getClusterName());
-          clusterId = Long.valueOf(cluster.getClusterId());
-        } catch (AmbariException e) {
-        }
-      }
-
-      LOG.debug("Received command report: " + report);
-      // Fetch HostRoleCommand that corresponds to a given task ID
-      HostRoleCommand hostRoleCommand = hostRoleCommandIterator.next();
-      HostEntity hostEntity = hostDAO.findByName(hostname);
-      if (hostEntity == null) {
-        LOG.error("Received a command report and was unable to retrieve HostEntity for hostname = " + hostname);
-        continue;
-      }
-
-      // Send event for final command reports for actions
-      if (RoleCommand.valueOf(report.getRoleCommand()) == RoleCommand.ACTIONEXECUTE &&
-          HostRoleStatus.valueOf(report.getStatus()).isCompletedState()) {
-        ActionFinalReportReceivedEvent event = new ActionFinalReportReceivedEvent(
-                clusterId, hostname, report, false);
-        ambariEventPublisher.publish(event);
-      }
-
-      // Skip sending events for command reports for ABORTed commands
-      if (hostRoleCommand.getStatus() == HostRoleStatus.ABORTED) {
-        continue;
-      }
-      if (hostRoleCommand.getStatus() == HostRoleStatus.QUEUED &&
-              report.getStatus().equals("IN_PROGRESS")) {
-        hostRoleCommand.setStartTime(now);
-      }
-
-      // If the report indicates the keytab file was successfully transferred to a host or removed
-      // from a host, record this for future reference
-      if (Service.Type.KERBEROS.name().equalsIgnoreCase(report.getServiceName()) &&
-          Role.KERBEROS_CLIENT.name().equalsIgnoreCase(report.getRole()) &&
-          RoleCommand.CUSTOM_COMMAND.name().equalsIgnoreCase(report.getRoleCommand()) &&
-          RequestExecution.Status.COMPLETED.name().equalsIgnoreCase(report.getStatus())) {
-
-        String customCommand = report.getCustomCommand();
-
-        boolean adding = "SET_KEYTAB".equalsIgnoreCase(customCommand);
-        if (adding || "REMOVE_KEYTAB".equalsIgnoreCase(customCommand)) {
-          WriteKeytabsStructuredOut writeKeytabsStructuredOut;
-          try {
-            writeKeytabsStructuredOut = gson.fromJson(report.getStructuredOut(), WriteKeytabsStructuredOut.class);
-          } catch (JsonSyntaxException ex) {
-            //Json structure was incorrect do nothing, pass this data further for processing
-            writeKeytabsStructuredOut = null;
-          }
-
-          if (writeKeytabsStructuredOut != null) {
-            Map<String, String> keytabs = writeKeytabsStructuredOut.getKeytabs();
-            if (keytabs != null) {
-              for (Map.Entry<String, String> entry : keytabs.entrySet()) {
-                String principal = entry.getKey();
-                if (!kerberosPrincipalHostDAO.exists(principal, hostEntity.getHostId())) {
-                  if (adding) {
-                    kerberosPrincipalHostDAO.create(principal, hostEntity.getHostId());
-                  } else if ("_REMOVED_".equalsIgnoreCase(entry.getValue())) {
-                    kerberosPrincipalHostDAO.remove(principal, hostEntity.getHostId());
-                  }
-                }
-              }
-            }
-          }
-        }
-      }
-
-      //pass custom START, STOP and RESTART
-      if (RoleCommand.ACTIONEXECUTE.toString().equals(report.getRoleCommand()) ||
-         (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
-         !("RESTART".equals(report.getCustomCommand()) ||
-         "START".equals(report.getCustomCommand()) ||
-         "STOP".equals(report.getCustomCommand())))) {
-        continue;
-      }
-
-      Cluster cl = clusterFsm.getCluster(report.getClusterName());
-      String service = report.getServiceName();
-      if (service == null || service.isEmpty()) {
-        throw new AmbariException("Invalid command report, service: " + service);
-      }
-      if (actionMetadata.getActions(service.toLowerCase()).contains(report.getRole())) {
-        LOG.debug(report.getRole() + " is an action - skip component lookup");
-      } else {
-        try {
-          Service svc = cl.getService(service);
-          ServiceComponent svcComp = svc.getServiceComponent(report.getRole());
-          ServiceComponentHost scHost = svcComp.getServiceComponentHost(hostname);
-          String schName = scHost.getServiceComponentName();
-
-          if (report.getStatus().equals(HostRoleStatus.COMPLETED.toString())) {
-
-            // Reading component version if it is present
-            if (StringUtils.isNotBlank(report.getStructuredOut())) {
-              ComponentVersionStructuredOut structuredOutput = null;
-              try {
-                structuredOutput = gson.fromJson(report.getStructuredOut(), ComponentVersionStructuredOut.class);
-              } catch (JsonSyntaxException ex) {
-                //Json structure for component version was incorrect
-                //do nothing, pass this data further for processing
-              }
-
-              String newVersion = structuredOutput == null ? null : structuredOutput.version;
-
-              // Pass true to always publish a version event.  It is safer to recalculate the version even if we don't
-              // detect a difference in the value.  This is useful in case that a manual database edit is done while
-              // ambari-server is stopped.
-              handleComponentVersionReceived(cl, scHost, newVersion, true);
-            }
-
-            // Updating stack version, if needed (this is not actually for express/rolling upgrades!)
-            if (scHost.getState().equals(State.UPGRADING)) {
-              scHost.setStackVersion(scHost.getDesiredStackVersion());
-            } else if ((report.getRoleCommand().equals(RoleCommand.START.toString()) ||
-                (report.getRoleCommand().equals(RoleCommand.CUSTOM_COMMAND.toString()) &&
-                    ("START".equals(report.getCustomCommand()) ||
-                    "RESTART".equals(report.getCustomCommand()))))
-                && null != report.getConfigurationTags()
-                && !report.getConfigurationTags().isEmpty()) {
-              LOG.info("Updating applied config on service " + scHost.getServiceName() +
-                ", component " + scHost.getServiceComponentName() + ", host " + scHost.getHostName());
-              scHost.updateActualConfigs(report.getConfigurationTags());
-              scHost.setRestartRequired(false);
-            }
-            // Necessary for resetting clients stale configs after starting service
-            if ((RoleCommand.INSTALL.toString().equals(report.getRoleCommand()) ||
-                (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
-                "INSTALL".equals(report.getCustomCommand()))) && svcComp.isClientComponent()){
-              scHost.updateActualConfigs(report.getConfigurationTags());
-              scHost.setRestartRequired(false);
-            }
-            if (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
-                !("START".equals(report.getCustomCommand()) ||
-                 "STOP".equals(report.getCustomCommand()))) {
-              //do not affect states for custom commands except START and STOP
-              //lets status commands to be responsible for this
-              continue;
-            }
-
-            if (RoleCommand.START.toString().equals(report.getRoleCommand()) ||
-                (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
-                    "START".equals(report.getCustomCommand()))) {
-              scHost.handleEvent(new ServiceComponentHostStartedEvent(schName,
-                  hostname, now));
-              scHost.setRestartRequired(false);
-            } else if (RoleCommand.STOP.toString().equals(report.getRoleCommand()) ||
-                (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
-                    "STOP".equals(report.getCustomCommand()))) {
-              scHost.handleEvent(new ServiceComponentHostStoppedEvent(schName,
-                  hostname, now));
-            } else {
-              scHost.handleEvent(new ServiceComponentHostOpSucceededEvent(schName,
-                  hostname, now));
-            }
-          } else if (report.getStatus().equals("FAILED")) {
-
-            if (StringUtils.isNotBlank(report.getStructuredOut())) {
-              try {
-                ComponentVersionStructuredOut structuredOutput = gson.fromJson(report.getStructuredOut(), ComponentVersionStructuredOut.class);
-
-                if (null != structuredOutput.upgradeDirection && structuredOutput.upgradeDirection.isUpgrade()) {
-                  scHost.setUpgradeState(UpgradeState.FAILED);
-                }
-              } catch (JsonSyntaxException ex) {
-                LOG.warn("Structured output was found, but not parseable: {}", report.getStructuredOut());
-              }
-            }
-
-            LOG.warn("Operation failed - may be retried. Service component host: "
-                + schName + ", host: " + hostname + " Action id" + report.getActionId());
-            if (actionManager.isInProgressCommand(report)) {
-              scHost.handleEvent(new ServiceComponentHostOpFailedEvent
-                (schName, hostname, now));
-            } else {
-              LOG.info("Received report for a command that is no longer active. " + report);
-            }
-          } else if (report.getStatus().equals("IN_PROGRESS")) {
-            scHost.handleEvent(new ServiceComponentHostOpInProgressEvent(schName,
-                hostname, now));
-          }
-        } catch (ServiceComponentNotFoundException scnex) {
-          LOG.warn("Service component not found ", scnex);
-        } catch (InvalidStateTransitionException ex) {
-          if (LOG.isDebugEnabled()) {
-            LOG.warn("State machine exception.", ex);
-          } else {
-            LOG.warn("State machine exception. " + ex.getMessage());
-          }
-        }
-      }
-    }
-
-    //Update state machines from reports
-    actionManager.processTaskResponse(hostname, reports, commands);
-  }
-
-  protected void processStatusReports(HeartBeat heartbeat,
-                                      String hostname,
-                                      Clusters clusterFsm)
-      throws AmbariException {
-    Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
-    for (Cluster cl : clusters) {
-      for (ComponentStatus status : heartbeat.componentStatus) {
-        if (status.getClusterName().equals(cl.getClusterName())) {
-          try {
-            Service svc = cl.getService(status.getServiceName());
-
-            String componentName = status.getComponentName();
-            if (svc.getServiceComponents().containsKey(componentName)) {
-              ServiceComponent svcComp = svc.getServiceComponent(
-                  componentName);
-              ServiceComponentHost scHost = svcComp.getServiceComponentHost(
-                  hostname);
-              State prevState = scHost.getState();
-              State liveState = State.valueOf(State.class, status.getStatus());
-              if (prevState.equals(State.INSTALLED)
-                  || prevState.equals(State.STARTED)
-                  || prevState.equals(State.STARTING)
-                  || prevState.equals(State.STOPPING)
-                  || prevState.equals(State.UNKNOWN)) {
-                scHost.setState(liveState); //TODO direct status set breaks state machine sometimes !!!
-                if (!prevState.equals(liveState)) {
-                  LOG.info("State of service component " + componentName
-                      + " of service " + status.getServiceName()
-                      + " of cluster " + status.getClusterName()
-                      + " has changed from " + prevState + " to " + liveState
-                      + " at host " + hostname);
-                }
-              }
-
-              SecurityState prevSecurityState = scHost.getSecurityState();
-              SecurityState currentSecurityState = SecurityState.valueOf(status.getSecurityState());
-              if((prevSecurityState != currentSecurityState)) {
-                if(prevSecurityState.isEndpoint()) {
-                  scHost.setSecurityState(currentSecurityState);
-                  LOG.info(String.format("Security of service component %s of service %s of cluster %s " +
-                          "has changed from %s to %s on host %s",
-                      componentName, status.getServiceName(), status.getClusterName(), prevSecurityState,
-                      currentSecurityState, hostname));
-                }
-                else {
-                  LOG.debug(String.format("Security of service component %s of service %s of cluster %s " +
-                          "has changed from %s to %s on host %s but will be ignored since %s is a " +
-                          "transitional state",
-                      componentName, status.getServiceName(), status.getClusterName(),
-                      prevSecurityState, currentSecurityState, hostname, prevSecurityState));
-                }
-              }
-
-              if (null != status.getStackVersion() && !status.getStackVersion().isEmpty()) {
-                scHost.setStackVersion(gson.fromJson(status.getStackVersion(), StackId.class));
-              }
-
-              if (null != status.getConfigTags()) {
-                scHost.updateActualConfigs(status.getConfigTags());
-              }
-
-              Map<String, Object> extra = status.getExtra();
-              if (null != extra && !extra.isEmpty()) {
-                try {
-                  if (extra.containsKey("processes")) {
-                    @SuppressWarnings("unchecked")
-                    List<Map<String, String>> list = (List<Map<String, String>>) extra.get("processes");
-                    scHost.setProcesses(list);
-                  }
-                  if (extra.containsKey("version")) {
-                    String version = extra.get("version").toString();
-
-                    handleComponentVersionReceived(cl, scHost, version, false);
-                  }
-
-                } catch (Exception e) {
-                  LOG.error("Could not access extra JSON for " +
-                      scHost.getServiceComponentName() + " from " +
-                      scHost.getHostName() + ": " + status.getExtra() +
-                      " (" + e.getMessage() + ")");
-                }
-              }
-
-              this.heartbeatMonitor.getAgentRequests()
-                  .setExecutionDetailsRequest(hostname, componentName, status.getSendExecCmdDet());
-            } else {
-              // TODO: What should be done otherwise?
-            }
-          } catch (ServiceNotFoundException e) {
-            LOG.warn("Received a live status update for a non-initialized"
-                + " service"
-                + ", clusterName=" + status.getClusterName()
-                + ", serviceName=" + status.getServiceName());
-            // FIXME ignore invalid live update and continue for now?
-            continue;
-          } catch (ServiceComponentNotFoundException e) {
-            LOG.warn("Received a live status update for a non-initialized"
-                + " servicecomponent"
-                + ", clusterName=" + status.getClusterName()
-                + ", serviceName=" + status.getServiceName()
-                + ", componentName=" + status.getComponentName());
-            // FIXME ignore invalid live update and continue for now?
-            continue;
-          } catch (ServiceComponentHostNotFoundException e) {
-            LOG.warn("Received a live status update for a non-initialized"
-                + " service"
-                + ", clusterName=" + status.getClusterName()
-                + ", serviceName=" + status.getServiceName()
-                + ", componentName=" + status.getComponentName()
-                + ", hostname=" + hostname);
-            // FIXME ignore invalid live update and continue for now?
-            continue;
-          } catch (RuntimeException e) {
-            LOG.warn("Received a live status with invalid payload"
-                + " service"
-                + ", clusterName=" + status.getClusterName()
-                + ", serviceName=" + status.getServiceName()
-                + ", componentName=" + status.getComponentName()
-                + ", hostname=" + hostname
-                + ", error=" + e.getMessage());
-            continue;
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Updates the version of the given service component, sets the upgrade state (if needed)
-   * and publishes a version event through the version event publisher.
-   *
-   * @param cluster        the cluster
-   * @param scHost         service component host
-   * @param newVersion     new version of service component
-   * @param alwaysPublish  if true, always publish a version event; if false,
-   *                       only publish if the component version was updated
-   */
-  private void handleComponentVersionReceived(Cluster cluster, ServiceComponentHost scHost,
-                                              String newVersion, boolean alwaysPublish) {
-
-    boolean updated = false;
-
-    if (StringUtils.isNotBlank(newVersion)) {
-      final String previousVersion = scHost.getVersion();
-      if (!StringUtils.equals(previousVersion, newVersion)) {
-        scHost.setVersion(newVersion);
-        scHost.setStackVersion(cluster.getDesiredStackVersion());
-        if (previousVersion != null && !previousVersion.equalsIgnoreCase(State.UNKNOWN.toString())) {
-          scHost.setUpgradeState(UpgradeState.COMPLETE);
-        }
-        updated = true;
-      }
-    }
-
-    if (updated || alwaysPublish) {
-      HostComponentVersionEvent event = new HostComponentVersionEvent(cluster, scHost);
-      versionEventPublisher.publish(event);
-    }
-  }
 
   /**
    * Adds commands from action queue to a heartbeat response.
@@ -1229,35 +702,4 @@ public class HeartBeatHandler {
     }
   }
 
-  /**
-   * This class is used for mapping json of structured output for component START action.
-   */
-  private static class ComponentVersionStructuredOut {
-    @SerializedName("version")
-    private String version;
-
-    @SerializedName("upgrade_type")
-    private UpgradeType upgradeType = null;
-
-    @SerializedName("direction")
-    private Direction upgradeDirection = null;
-
-  }
-
-  /**
-   * This class is used for mapping json of structured output for keytab distribution actions.
-   */
-  private static class WriteKeytabsStructuredOut {
-    @SerializedName("keytabs")
-    private Map<String,String> keytabs;
-
-    public Map<String, String> getKeytabs() {
-      return keytabs;
-    }
-
-    public void setKeytabs(Map<String, String> keytabs) {
-      this.keytabs = keytabs;
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
index efc717d..378e123 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
@@ -140,6 +140,10 @@ public class HeartbeatMonitor implements Runnable {
     List<Host> allHosts = clusters.getHosts();
     long now = System.currentTimeMillis();
     for (Host hostObj : allHosts) {
+      if (hostObj.getState() == HostState.HEARTBEAT_LOST) {
+        //do not check if host already known be lost
+        continue;
+      }
       String host = hostObj.getHostName();
       HostState hostState = hostObj.getState();
       String hostname = hostObj.getHostName();
@@ -212,6 +216,8 @@ public class HeartbeatMonitor implements Runnable {
         switch (sch.getState()) {
           case INIT:
           case INSTALLING:
+          case STARTING:
+          case STOPPING:
             //don't send commands until component is installed at least
             continue;
           default:

http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
new file mode 100644
index 0000000..2188a77
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
@@ -0,0 +1,773 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+
+import com.google.common.util.concurrent.AbstractScheduledService;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.annotations.SerializedName;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.ServiceComponentHostNotFoundException;
+import org.apache.ambari.server.ServiceComponentNotFoundException;
+import org.apache.ambari.server.ServiceNotFoundException;
+import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.controller.MaintenanceStateHelper;
+import org.apache.ambari.server.events.ActionFinalReportReceivedEvent;
+import org.apache.ambari.server.events.AlertEvent;
+import org.apache.ambari.server.events.AlertReceivedEvent;
+import org.apache.ambari.server.events.HostComponentVersionEvent;
+import org.apache.ambari.server.events.publishers.AlertEventPublisher;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.events.publishers.VersionEventPublisher;
+import org.apache.ambari.server.metadata.ActionMetadata;
+import org.apache.ambari.server.orm.dao.KerberosPrincipalHostDAO;
+import org.apache.ambari.server.state.Alert;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.ComponentInfo;
+import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.state.HostHealthStatus;
+import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.ambari.server.state.SecurityState;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponent;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.State;
+import org.apache.ambari.server.state.UpgradeState;
+import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
+import org.apache.ambari.server.state.scheduler.RequestExecution;
+import org.apache.ambari.server.state.stack.upgrade.Direction;
+import org.apache.ambari.server.state.stack.upgrade.UpgradeType;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpInProgressEvent;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpSucceededEvent;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartedEvent;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStoppedEvent;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * HeartbeatProcessor class is used for bulk processing data retrieved from agents in background
+ *
+ */
+public class HeartbeatProcessor extends AbstractService{
+  private static final Logger LOG = LoggerFactory.getLogger(HeartbeatProcessor.class);
+
+  private ScheduledExecutorService executor;
+
+  private ConcurrentLinkedQueue<HeartBeat> heartBeatsQueue = new ConcurrentLinkedQueue<>();
+
+  private volatile boolean shouldRun = true;
+
+  //TODO rewrite to correlate with heartbeat frequency, hardcoded in agent as of now
+  private long delay = 5000;
+  private long period = 1000;
+
+  private int poolSize = 1;
+
+  private Clusters clusterFsm;
+  private HeartbeatMonitor heartbeatMonitor;
+  private Injector injector;
+  private ActionManager actionManager;
+
+  /**
+   * Publishes {@link AlertEvent} instances.
+   */
+  @Inject
+  AlertEventPublisher alertEventPublisher;
+
+  @Inject
+  AmbariEventPublisher ambariEventPublisher;
+
+  @Inject
+  VersionEventPublisher versionEventPublisher;
+
+  @Inject
+  ActionMetadata actionMetadata;
+
+  @Inject
+  MaintenanceStateHelper maintenanceStateHelper;
+
+  @Inject
+  AmbariMetaInfo ambariMetaInfo;
+
+  @Inject
+  KerberosPrincipalHostDAO kerberosPrincipalHostDAO;
+
+  @Inject
+  Gson gson;
+
+  @Inject
+  public HeartbeatProcessor(Clusters clusterFsm, ActionManager am, HeartbeatMonitor heartbeatMonitor,
+                            Injector injector) {
+    injector.injectMembers(this);
+
+    this.injector = injector;
+    this.heartbeatMonitor = heartbeatMonitor;
+    this.clusterFsm = clusterFsm;
+    actionManager = am;
+    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("ambari-heartbeat-processor-%d").build();
+    executor = Executors.newScheduledThreadPool(poolSize, threadFactory);
+  }
+
+  @Override
+  protected void doStart() {
+    LOG.info("**** Starting heartbeats processing threads ****");
+    for (int i=0; i< poolSize; i++) {
+      executor.scheduleAtFixedRate(new HeartbeatProcessingTask(), delay, period, TimeUnit.MILLISECONDS);
+    }
+  }
+
+  @Override
+  protected void doStop() {
+    LOG.info("**** Stopping heartbeats processing threads ****");
+    shouldRun = false;
+    executor.shutdown();
+  }
+
+  public void addHeartbeat(HeartBeat heartBeat) {
+    heartBeatsQueue.add(heartBeat);
+  }
+
+  private HeartBeat pollHeartbeat() {
+    return heartBeatsQueue.poll();
+  }
+
+  /**
+   * Processing task to be scheduled for execution
+   */
+  private class HeartbeatProcessingTask implements Runnable {
+
+    @Override
+    public void run() {
+      while (shouldRun) {
+        try {
+          HeartBeat heartbeat = pollHeartbeat();
+          if (heartbeat == null) {
+            break;
+          }
+          processHeartbeat(heartbeat);
+        } catch (Exception e) {
+          LOG.error("Exception received while processing heartbeat", e);
+        } catch (Throwable throwable) {
+          //catch everything to prevent task suppression
+          LOG.error("ERROR: ", throwable);
+        }
+
+
+      }
+    }
+  }
+
+  /**
+   * Incapsulates logic for processing data from agent heartbeat
+   * @param heartbeat Agent heartbeat object
+   * @throws AmbariException
+   */
+  public void processHeartbeat(HeartBeat heartbeat) throws AmbariException {
+    long now = System.currentTimeMillis();
+
+    processAlerts(heartbeat);
+
+    processCommandReports(heartbeat, now);
+    processStatusReports(heartbeat);
+    //host status calculation are based on task and status reports, should be performed last
+    processHostStatus(heartbeat);
+  }
+
+
+
+  /**
+   * Extracts all of the {@link Alert}s from the heartbeat and fires
+   * {@link AlertEvent}s for each one. If there is a problem looking up the
+   * cluster, then alerts will not be processed.
+   *
+   * @param heartbeat
+   *          the heartbeat to process.
+   */
+  protected void processAlerts(HeartBeat heartbeat) {
+    if (heartbeat == null) {
+      return;
+    }
+
+    String hostname = heartbeat.getHostname();
+
+    if (null != heartbeat.getAlerts()) {
+      AlertEvent event = new AlertReceivedEvent(heartbeat.getAlerts());
+      for (Alert alert : event.getAlerts()) {
+        if (alert.getHostName() == null) {
+          alert.setHostName(hostname);
+        }
+      }
+      alertEventPublisher.publish(event);
+
+    }
+  }
+
+  /**
+   * Update host status basing on components statuses
+   * @param heartbeat heartbeat to process
+   * @throws AmbariException
+   */
+  protected void processHostStatus(HeartBeat heartbeat) throws AmbariException {
+
+    String hostname = heartbeat.getHostname();
+    Host host = clusterFsm.getHost(hostname);
+    HostHealthStatus.HealthStatus healthStatus = host.getHealthStatus().getHealthStatus();
+
+    if (!healthStatus.equals(HostHealthStatus.HealthStatus.UNKNOWN)) {
+
+      List<ComponentStatus> componentStatuses = heartbeat.getComponentStatus();
+      //Host status info could be calculated only if agent returned statuses in heartbeat
+      //Or, if a command is executed that can change component status
+      boolean calculateHostStatus = false;
+      String clusterName = null;
+      if (componentStatuses.size() > 0) {
+        calculateHostStatus = true;
+        for (ComponentStatus componentStatus : componentStatuses) {
+          clusterName = componentStatus.getClusterName();
+          break;
+        }
+      }
+
+      if (!calculateHostStatus) {
+        List<CommandReport> reports = heartbeat.getReports();
+        for (CommandReport report : reports) {
+          if (RoleCommand.ACTIONEXECUTE.toString().equals(report.getRoleCommand())) {
+            continue;
+          }
+
+          String service = report.getServiceName();
+          if (actionMetadata.getActions(service.toLowerCase()).contains(report.getRole())) {
+            continue;
+          }
+          if (report.getStatus().equals("COMPLETED")) {
+            calculateHostStatus = true;
+            clusterName = report.getClusterName();
+            break;
+          }
+        }
+      }
+
+      if (calculateHostStatus) {
+        //Use actual component status to compute the host status
+        int masterCount = 0;
+        int mastersRunning = 0;
+        int slaveCount = 0;
+        int slavesRunning = 0;
+
+        StackId stackId;
+        Cluster cluster = clusterFsm.getCluster(clusterName);
+        stackId = cluster.getDesiredStackVersion();
+
+
+        List<ServiceComponentHost> scHosts = cluster.getServiceComponentHosts(heartbeat.getHostname());
+        for (ServiceComponentHost scHost : scHosts) {
+          ComponentInfo componentInfo =
+              ambariMetaInfo.getComponent(stackId.getStackName(),
+                  stackId.getStackVersion(), scHost.getServiceName(),
+                  scHost.getServiceComponentName());
+
+          String status = scHost.getState().name();
+
+          String category = componentInfo.getCategory();
+
+          if (MaintenanceState.OFF == maintenanceStateHelper.getEffectiveState(scHost, host)) {
+            if (category.equals("MASTER")) {
+              ++masterCount;
+              if (status.equals("STARTED")) {
+                ++mastersRunning;
+              }
+            } else if (category.equals("SLAVE")) {
+              ++slaveCount;
+              if (status.equals("STARTED")) {
+                ++slavesRunning;
+              }
+            }
+          }
+        }
+
+        if (masterCount == mastersRunning && slaveCount == slavesRunning) {
+          healthStatus = HostHealthStatus.HealthStatus.HEALTHY;
+        } else if (masterCount > 0 && mastersRunning < masterCount) {
+          healthStatus = HostHealthStatus.HealthStatus.UNHEALTHY;
+        } else {
+          healthStatus = HostHealthStatus.HealthStatus.ALERT;
+        }
+
+        host.setStatus(healthStatus.name());
+        host.persist();
+      }
+
+      //If host doesn't belong to any cluster
+      if ((clusterFsm.getClustersForHost(host.getHostName())).size() == 0) {
+        healthStatus = HostHealthStatus.HealthStatus.HEALTHY;
+        host.setStatus(healthStatus.name());
+        host.persist();
+      }
+    }
+  }
+
+  /**
+   * Process reports of tasks executed on agents
+   * @param heartbeat heartbeat to process
+   * @param now cached current time
+   * @throws AmbariException
+   */
+  protected void processCommandReports(
+      HeartBeat heartbeat, long now)
+      throws AmbariException {
+    String hostname = heartbeat.getHostname();
+    List<CommandReport> reports = heartbeat.getReports();
+
+    // Cache HostRoleCommand entities because we will need them few times
+    List<Long> taskIds = new ArrayList<Long>();
+    for (CommandReport report : reports) {
+      taskIds.add(report.getTaskId());
+    }
+    Collection<HostRoleCommand> commands = actionManager.getTasks(taskIds);
+
+    Iterator<HostRoleCommand> hostRoleCommandIterator = commands.iterator();
+    for (CommandReport report : reports) {
+
+      Long clusterId = null;
+      if (report.getClusterName() != null) {
+        try {
+          Cluster cluster = clusterFsm.getCluster(report.getClusterName());
+          clusterId = cluster.getClusterId();
+        } catch (AmbariException e) {
+        }
+      }
+
+      LOG.debug("Received command report: " + report);
+      // Fetch HostRoleCommand that corresponds to a given task ID
+      HostRoleCommand hostRoleCommand = hostRoleCommandIterator.next();
+      Host host = clusterFsm.getHost(hostname);
+//      HostEntity hostEntity = hostDAO.findByName(hostname); //don't touch database
+      if (host == null) {
+        LOG.error("Received a command report and was unable to retrieve Host for hostname = " + hostname);
+        continue;
+      }
+
+      // Send event for final command reports for actions
+      if (RoleCommand.valueOf(report.getRoleCommand()) == RoleCommand.ACTIONEXECUTE &&
+          HostRoleStatus.valueOf(report.getStatus()).isCompletedState()) {
+        ActionFinalReportReceivedEvent event = new ActionFinalReportReceivedEvent(
+            clusterId, hostname, report, false);
+        ambariEventPublisher.publish(event);
+      }
+
+      // Skip sending events for command reports for ABORTed commands
+      if (hostRoleCommand.getStatus() == HostRoleStatus.ABORTED) {
+        continue;
+      }
+      if (hostRoleCommand.getStatus() == HostRoleStatus.QUEUED &&
+          report.getStatus().equals("IN_PROGRESS")) {
+        hostRoleCommand.setStartTime(now);
+      }
+
+      // If the report indicates the keytab file was successfully transferred to a host or removed
+      // from a host, record this for future reference
+      if (Service.Type.KERBEROS.name().equalsIgnoreCase(report.getServiceName()) &&
+          Role.KERBEROS_CLIENT.name().equalsIgnoreCase(report.getRole()) &&
+          RoleCommand.CUSTOM_COMMAND.name().equalsIgnoreCase(report.getRoleCommand()) &&
+          RequestExecution.Status.COMPLETED.name().equalsIgnoreCase(report.getStatus())) {
+
+        String customCommand = report.getCustomCommand();
+
+        boolean adding = "SET_KEYTAB".equalsIgnoreCase(customCommand);
+        if (adding || "REMOVE_KEYTAB".equalsIgnoreCase(customCommand)) {
+          WriteKeytabsStructuredOut writeKeytabsStructuredOut;
+          try {
+            writeKeytabsStructuredOut = gson.fromJson(report.getStructuredOut(), WriteKeytabsStructuredOut.class);
+          } catch (JsonSyntaxException ex) {
+            //Json structure was incorrect do nothing, pass this data further for processing
+            writeKeytabsStructuredOut = null;
+          }
+
+          if (writeKeytabsStructuredOut != null) {
+            Map<String, String> keytabs = writeKeytabsStructuredOut.getKeytabs();
+            if (keytabs != null) {
+              for (Map.Entry<String, String> entry : keytabs.entrySet()) {
+                String principal = entry.getKey();
+                if (!kerberosPrincipalHostDAO.exists(principal, host.getHostId())) {
+                  if (adding) {
+                    kerberosPrincipalHostDAO.create(principal, host.getHostId());
+                  } else if ("_REMOVED_".equalsIgnoreCase(entry.getValue())) {
+                    kerberosPrincipalHostDAO.remove(principal, host.getHostId());
+                  }
+                }
+              }
+            }
+          }
+        }
+      }
+
+      //pass custom START, STOP and RESTART
+      if (RoleCommand.ACTIONEXECUTE.toString().equals(report.getRoleCommand()) ||
+          (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
+              !("RESTART".equals(report.getCustomCommand()) ||
+                  "START".equals(report.getCustomCommand()) ||
+                  "STOP".equals(report.getCustomCommand())))) {
+        continue;
+      }
+
+      Cluster cl = clusterFsm.getCluster(report.getClusterName());
+      String service = report.getServiceName();
+      if (service == null || service.isEmpty()) {
+        throw new AmbariException("Invalid command report, service: " + service);
+      }
+      if (actionMetadata.getActions(service.toLowerCase()).contains(report.getRole())) {
+        LOG.debug(report.getRole() + " is an action - skip component lookup");
+      } else {
+        try {
+          Service svc = cl.getService(service);
+          ServiceComponent svcComp = svc.getServiceComponent(report.getRole());
+          ServiceComponentHost scHost = svcComp.getServiceComponentHost(hostname);
+          String schName = scHost.getServiceComponentName();
+
+          if (report.getStatus().equals(HostRoleStatus.COMPLETED.toString())) {
+
+            // Reading component version if it is present
+            if (StringUtils.isNotBlank(report.getStructuredOut())) {
+              ComponentVersionStructuredOut structuredOutput = null;
+              try {
+                structuredOutput = gson.fromJson(report.getStructuredOut(), ComponentVersionStructuredOut.class);
+              } catch (JsonSyntaxException ex) {
+                //Json structure for component version was incorrect
+                //do nothing, pass this data further for processing
+              }
+
+              String newVersion = structuredOutput == null ? null : structuredOutput.version;
+
+              // Pass true to always publish a version event.  It is safer to recalculate the version even if we don't
+              // detect a difference in the value.  This is useful in case that a manual database edit is done while
+              // ambari-server is stopped.
+              handleComponentVersionReceived(cl, scHost, newVersion, true);
+            }
+
+            // Updating stack version, if needed (this is not actually for express/rolling upgrades!)
+            if (scHost.getState().equals(org.apache.ambari.server.state.State.UPGRADING)) {
+              scHost.setStackVersion(scHost.getDesiredStackVersion());
+            } else if ((report.getRoleCommand().equals(RoleCommand.START.toString()) ||
+                (report.getRoleCommand().equals(RoleCommand.CUSTOM_COMMAND.toString()) &&
+                    ("START".equals(report.getCustomCommand()) ||
+                        "RESTART".equals(report.getCustomCommand()))))
+                && null != report.getConfigurationTags()
+                && !report.getConfigurationTags().isEmpty()) {
+              LOG.info("Updating applied config on service " + scHost.getServiceName() +
+                  ", component " + scHost.getServiceComponentName() + ", host " + scHost.getHostName());
+              scHost.updateActualConfigs(report.getConfigurationTags());
+              scHost.setRestartRequired(false);
+            }
+            // Necessary for resetting clients stale configs after starting service
+            if ((RoleCommand.INSTALL.toString().equals(report.getRoleCommand()) ||
+                (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
+                    "INSTALL".equals(report.getCustomCommand()))) && svcComp.isClientComponent()){
+              scHost.updateActualConfigs(report.getConfigurationTags());
+              scHost.setRestartRequired(false);
+            }
+            if (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
+                !("START".equals(report.getCustomCommand()) ||
+                    "STOP".equals(report.getCustomCommand()))) {
+              //do not affect states for custom commands except START and STOP
+              //lets status commands to be responsible for this
+              continue;
+            }
+
+            if (RoleCommand.START.toString().equals(report.getRoleCommand()) ||
+                (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
+                    "START".equals(report.getCustomCommand()))) {
+              scHost.handleEvent(new ServiceComponentHostStartedEvent(schName,
+                  hostname, now));
+              scHost.setRestartRequired(false);
+            } else if (RoleCommand.STOP.toString().equals(report.getRoleCommand()) ||
+                (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
+                    "STOP".equals(report.getCustomCommand()))) {
+              scHost.handleEvent(new ServiceComponentHostStoppedEvent(schName,
+                  hostname, now));
+            } else {
+              scHost.handleEvent(new ServiceComponentHostOpSucceededEvent(schName,
+                  hostname, now));
+            }
+          } else if (report.getStatus().equals("FAILED")) {
+
+            if (StringUtils.isNotBlank(report.getStructuredOut())) {
+              try {
+                ComponentVersionStructuredOut structuredOutput = gson.fromJson(report.getStructuredOut(), ComponentVersionStructuredOut.class);
+
+                if (null != structuredOutput.upgradeDirection && structuredOutput.upgradeDirection.isUpgrade()) {
+                  scHost.setUpgradeState(UpgradeState.FAILED);
+                }
+              } catch (JsonSyntaxException ex) {
+                LOG.warn("Structured output was found, but not parseable: {}", report.getStructuredOut());
+              }
+            }
+
+            LOG.warn("Operation failed - may be retried. Service component host: "
+                + schName + ", host: " + hostname + " Action id" + report.getActionId());
+            if (actionManager.isInProgressCommand(report)) {
+              scHost.handleEvent(new ServiceComponentHostOpFailedEvent
+                  (schName, hostname, now));
+            } else {
+              LOG.info("Received report for a command that is no longer active. " + report);
+            }
+          } else if (report.getStatus().equals("IN_PROGRESS")) {
+            scHost.handleEvent(new ServiceComponentHostOpInProgressEvent(schName,
+                hostname, now));
+          }
+        } catch (ServiceComponentNotFoundException scnex) {
+          LOG.warn("Service component not found ", scnex);
+        } catch (InvalidStateTransitionException ex) {
+          if (LOG.isDebugEnabled()) {
+            LOG.warn("State machine exception.", ex);
+          } else {
+            LOG.warn("State machine exception. " + ex.getMessage());
+          }
+        }
+      }
+    }
+
+    //Update state machines from reports
+    actionManager.processTaskResponse(hostname, reports, commands);
+  }
+
+  /**
+   * Process reports of status commands
+   * @param heartbeat heartbeat to process
+   * @throws AmbariException
+   */
+  protected void processStatusReports(HeartBeat heartbeat) throws AmbariException {
+    String hostname = heartbeat.getHostname();
+    Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
+    for (Cluster cl : clusters) {
+      for (ComponentStatus status : heartbeat.componentStatus) {
+        if (status.getClusterName().equals(cl.getClusterName())) {
+          try {
+            Service svc = cl.getService(status.getServiceName());
+
+            String componentName = status.getComponentName();
+            if (svc.getServiceComponents().containsKey(componentName)) {
+              ServiceComponent svcComp = svc.getServiceComponent(
+                  componentName);
+              ServiceComponentHost scHost = svcComp.getServiceComponentHost(
+                  hostname);
+              org.apache.ambari.server.state.State prevState = scHost.getState();
+              org.apache.ambari.server.state.State liveState =
+                  org.apache.ambari.server.state.State.valueOf(org.apache.ambari.server.state.State.class,
+                      status.getStatus());
+              if (prevState.equals(org.apache.ambari.server.state.State.INSTALLED)
+                  || prevState.equals(org.apache.ambari.server.state.State.STARTED)
+                  || prevState.equals(org.apache.ambari.server.state.State.STARTING)
+                  || prevState.equals(org.apache.ambari.server.state.State.STOPPING)
+                  || prevState.equals(org.apache.ambari.server.state.State.UNKNOWN)) {
+                scHost.setState(liveState); //TODO direct status set breaks state machine sometimes !!!
+                if (!prevState.equals(liveState)) {
+                  LOG.info("State of service component " + componentName
+                      + " of service " + status.getServiceName()
+                      + " of cluster " + status.getClusterName()
+                      + " has changed from " + prevState + " to " + liveState
+                      + " at host " + hostname);
+                }
+              }
+
+              SecurityState prevSecurityState = scHost.getSecurityState();
+              SecurityState currentSecurityState = SecurityState.valueOf(status.getSecurityState());
+              if((prevSecurityState != currentSecurityState)) {
+                if(prevSecurityState.isEndpoint()) {
+                  scHost.setSecurityState(currentSecurityState);
+                  LOG.info(String.format("Security of service component %s of service %s of cluster %s " +
+                          "has changed from %s to %s on host %s",
+                      componentName, status.getServiceName(), status.getClusterName(), prevSecurityState,
+                      currentSecurityState, hostname));
+                }
+                else {
+                  LOG.debug(String.format("Security of service component %s of service %s of cluster %s " +
+                          "has changed from %s to %s on host %s but will be ignored since %s is a " +
+                          "transitional state",
+                      componentName, status.getServiceName(), status.getClusterName(),
+                      prevSecurityState, currentSecurityState, hostname, prevSecurityState));
+                }
+              }
+
+              if (null != status.getStackVersion() && !status.getStackVersion().isEmpty()) {
+                scHost.setStackVersion(gson.fromJson(status.getStackVersion(), StackId.class));
+              }
+
+              if (null != status.getConfigTags()) {
+                scHost.updateActualConfigs(status.getConfigTags());
+              }
+
+              Map<String, Object> extra = status.getExtra();
+              if (null != extra && !extra.isEmpty()) {
+                try {
+                  if (extra.containsKey("processes")) {
+                    @SuppressWarnings("unchecked")
+                    List<Map<String, String>> list = (List<Map<String, String>>) extra.get("processes");
+                    scHost.setProcesses(list);
+                  }
+                  if (extra.containsKey("version")) {
+                    String version = extra.get("version").toString();
+
+                    handleComponentVersionReceived(cl, scHost, version, false);
+                  }
+
+                } catch (Exception e) {
+                  LOG.error("Could not access extra JSON for " +
+                      scHost.getServiceComponentName() + " from " +
+                      scHost.getHostName() + ": " + status.getExtra() +
+                      " (" + e.getMessage() + ")");
+                }
+              }
+
+              this.heartbeatMonitor.getAgentRequests()
+                  .setExecutionDetailsRequest(hostname, componentName, status.getSendExecCmdDet());
+            } else {
+              // TODO: What should be done otherwise?
+            }
+          } catch (ServiceNotFoundException e) {
+            LOG.warn("Received a live status update for a non-initialized"
+                + " service"
+                + ", clusterName=" + status.getClusterName()
+                + ", serviceName=" + status.getServiceName());
+            // FIXME ignore invalid live update and continue for now?
+            continue;
+          } catch (ServiceComponentNotFoundException e) {
+            LOG.warn("Received a live status update for a non-initialized"
+                + " servicecomponent"
+                + ", clusterName=" + status.getClusterName()
+                + ", serviceName=" + status.getServiceName()
+                + ", componentName=" + status.getComponentName());
+            // FIXME ignore invalid live update and continue for now?
+            continue;
+          } catch (ServiceComponentHostNotFoundException e) {
+            LOG.warn("Received a live status update for a non-initialized"
+                + " service"
+                + ", clusterName=" + status.getClusterName()
+                + ", serviceName=" + status.getServiceName()
+                + ", componentName=" + status.getComponentName()
+                + ", hostname=" + hostname);
+            // FIXME ignore invalid live update and continue for now?
+            continue;
+          } catch (RuntimeException e) {
+            LOG.warn("Received a live status with invalid payload"
+                + " service"
+                + ", clusterName=" + status.getClusterName()
+                + ", serviceName=" + status.getServiceName()
+                + ", componentName=" + status.getComponentName()
+                + ", hostname=" + hostname
+                + ", error=" + e.getMessage());
+            continue;
+          }
+        }
+      }
+    }
+  }
+
+
+
+  /**
+   * Updates the version of the given service component, sets the upgrade state (if needed)
+   * and publishes a version event through the version event publisher.
+   *
+   * @param cluster        the cluster
+   * @param scHost         service component host
+   * @param newVersion     new version of service component
+   * @param alwaysPublish  if true, always publish a version event; if false,
+   *                       only publish if the component version was updated
+   */
+  private void handleComponentVersionReceived(Cluster cluster, ServiceComponentHost scHost,
+                                              String newVersion, boolean alwaysPublish) {
+
+    boolean updated = false;
+
+    if (StringUtils.isNotBlank(newVersion)) {
+      final String previousVersion = scHost.getVersion();
+      if (!StringUtils.equals(previousVersion, newVersion)) {
+        scHost.setVersion(newVersion);
+        scHost.setStackVersion(cluster.getDesiredStackVersion());
+        if (previousVersion != null && !previousVersion.equalsIgnoreCase(
+            org.apache.ambari.server.state.State.UNKNOWN.toString())) {
+          scHost.setUpgradeState(UpgradeState.COMPLETE);
+        }
+        updated = true;
+      }
+    }
+
+    if (updated || alwaysPublish) {
+      HostComponentVersionEvent event = new HostComponentVersionEvent(cluster, scHost);
+      versionEventPublisher.publish(event);
+    }
+  }
+
+  /**
+   * This class is used for mapping json of structured output for keytab distribution actions.
+   */
+  private static class WriteKeytabsStructuredOut {
+    @SerializedName("keytabs")
+    private Map<String,String> keytabs;
+
+    public Map<String, String> getKeytabs() {
+      return keytabs;
+    }
+
+    public void setKeytabs(Map<String, String> keytabs) {
+      this.keytabs = keytabs;
+    }
+  }
+
+
+  /**
+   * This class is used for mapping json of structured output for component START action.
+   */
+  private static class ComponentVersionStructuredOut {
+    @SerializedName("version")
+    private String version;
+
+    @SerializedName("upgrade_type")
+    private UpgradeType upgradeType = null;
+
+    @SerializedName("direction")
+    private Direction upgradeDirection = null;
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostVersionDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostVersionDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostVersionDAO.java
index 040876a..e28f9ef 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostVersionDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostVersionDAO.java
@@ -131,8 +131,8 @@ public class HostVersionDAO extends CrudDAO<HostVersionEntity, Long> {
   }
 
   /**
-   * Retrieve all of the host versions for the given cluster name, host name, and state.
-   *
+   * Retrieve all of the host versions for the given cluster name, host name, and state. <br/>
+   * Consider using faster method: {@link HostVersionDAO#findByClusterHostAndState(long, long, org.apache.ambari.server.state.RepositoryVersionState)}
    * @param clusterName Cluster name
    * @param hostName FQDN of host
    * @param state repository version state
@@ -150,8 +150,29 @@ public class HostVersionDAO extends CrudDAO<HostVersionEntity, Long> {
   }
 
   /**
+   * Faster version of {@link HostVersionDAO#findByClusterHostAndState(java.lang.String, java.lang.String, org.apache.ambari.server.state.RepositoryVersionState)}
+   *
+   * @param clusterId Cluster ID
+   * @param hostId Host ID
+   * @param state repository version state
+   * @return Return all of the host versions that match the criteria.
+   */
+  @RequiresSession
+  public List<HostVersionEntity> findByClusterHostAndState(long clusterId, long hostId, RepositoryVersionState state) {
+    TypedQuery<HostVersionEntity> query =
+        entityManagerProvider.get().createNamedQuery("hostVersionByClusterHostIdAndState", HostVersionEntity.class);
+
+    query.setParameter("clusterId", clusterId);
+    query.setParameter("hostId", hostId);
+    query.setParameter("state", state);
+
+    return daoUtils.selectList(query);
+  }
+
+  /**
    * Retrieve the single host version whose state is {@link org.apache.ambari.server.state.RepositoryVersionState#CURRENT}, of which there should be exactly one at all times
    * for the given host.
+   * Consider using faster method {@link HostVersionDAO#findByHostAndStateCurrent(long, long)}
    *
    * @param clusterName Cluster name
    * @param hostName Host name
@@ -175,8 +196,36 @@ public class HostVersionDAO extends CrudDAO<HostVersionEntity, Long> {
   }
 
   /**
+   * Retrieve the single host version whose state is {@link org.apache.ambari.server.state.RepositoryVersionState#CURRENT}, of which there should be exactly one at all times
+   * for the given host.
+   * Faster version of {@link HostVersionDAO#findByHostAndStateCurrent(java.lang.String, java.lang.String)}
+   * @param clusterId Cluster ID
+   * @param hostId host ID
+   * @return Returns the single host version for this host whose state is {@link org.apache.ambari.server.state.RepositoryVersionState#CURRENT}, or {@code null} otherwise.
+   */
+  @RequiresSession
+  public HostVersionEntity findByHostAndStateCurrent(long clusterId, long hostId) {
+    try {
+      List<?> results = findByClusterHostAndState(clusterId, hostId, RepositoryVersionState.CURRENT);
+      if (results.isEmpty()) {
+        return null;
+      } else {
+        if (results.size() == 1) {
+          return (HostVersionEntity) results.get(0);
+        }
+      }
+      throw new NonUniqueResultException();
+    } catch (NoResultException ignored) {
+      return null;
+    }
+  }
+
+  /**
    * Retrieve the single host version for the given cluster, stack name, stack
-   * version, and host name.
+   * version, and host name. <br/>
+   * This query is slow and not suitable for frequent use. <br/>
+   * Please, use {@link HostVersionDAO#findByClusterStackVersionAndHost(long, org.apache.ambari.server.state.StackId, java.lang.String, long)} <br/>
+   * It is ~50 times faster
    *
    * @param clusterName
    *          Cluster name
@@ -203,6 +252,29 @@ public class HostVersionDAO extends CrudDAO<HostVersionEntity, Long> {
     return daoUtils.selectSingle(query);
   }
 
+  /**
+   * Optimized version of {@link HostVersionDAO#findByClusterStackVersionAndHost(java.lang.String, org.apache.ambari.server.state.StackId, java.lang.String, java.lang.String)}
+   * @param clusterId Id of cluster
+   * @param stackId Stack ID (e.g., HDP-2.2)
+   * @param version Stack version (e.g., 2.2.0.1-995)
+   * @param hostId Host Id
+   * @return Returns the single host version that matches the criteria.
+   */
+  @RequiresSession
+  public HostVersionEntity findByClusterStackVersionAndHost(long clusterId, StackId stackId, String version,
+                                                            long hostId) {
+    TypedQuery<HostVersionEntity> query = entityManagerProvider.get()
+        .createNamedQuery("hostVersionByClusterStackVersionAndHostId", HostVersionEntity.class);
+
+    query.setParameter("clusterId", clusterId);
+    query.setParameter("stackName", stackId.getStackName());
+    query.setParameter("stackVersion", stackId.getStackVersion());
+    query.setParameter("version", version);
+    query.setParameter("hostId", hostId);
+
+    return daoUtils.selectSingle(query);
+  }
+
   @Transactional
   public void removeByHostName(String hostName) {
     Collection<HostVersionEntity> hostVersions = this.findByHost(hostName);

http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostVersionEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostVersionEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostVersionEntity.java
index b69518b..6be4b50 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostVersionEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostVersionEntity.java
@@ -62,6 +62,15 @@ import org.apache.ambari.server.state.RepositoryVersionState;
         "SELECT hostVersion FROM HostVersionEntity hostVersion JOIN hostVersion.hostEntity host JOIN host.clusterEntities clusters " +
             "WHERE clusters.clusterName=:clusterName AND hostVersion.repositoryVersion.stack.stackName=:stackName AND hostVersion.repositoryVersion.stack.stackVersion=:stackVersion AND hostVersion.repositoryVersion.version=:version AND " +
             "hostVersion.hostEntity.hostName=:hostName"),
+
+    @NamedQuery(name = "hostVersionByClusterHostIdAndState", query =
+        "SELECT hostVersion FROM HostVersionEntity hostVersion JOIN hostVersion.hostEntity host JOIN host.clusterEntities clusters " +
+            "WHERE clusters.clusterId=:clusterId AND hostVersion.hostId=:hostId AND hostVersion.state=:state"),
+
+    @NamedQuery(name = "hostVersionByClusterStackVersionAndHostId", query =
+        "SELECT hostVersion FROM HostVersionEntity hostVersion JOIN hostVersion.hostEntity host JOIN host.clusterEntities clusters " +
+        "WHERE hostVersion.hostId=:hostId AND clusters.clusterId=:clusterId AND hostVersion.repositoryVersion.stack.stackName=:stackName " +
+        "AND hostVersion.repositoryVersion.stack.stackVersion=:stackVersion AND hostVersion.repositoryVersion.version=:version")
 })
 public class HostVersionEntity {
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index 4212975..c6d01e8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -1560,8 +1560,8 @@ public class ClusterImpl implements Cluster {
     StackId repoVersionStackId = new StackId(repoVersionStackEntity);
 
     HostVersionEntity hostVersionEntity = hostVersionDAO.findByClusterStackVersionAndHost(
-      getClusterName(), repoVersionStackId, repositoryVersion.getVersion(),
-      host.getHostName());
+      getClusterId(), repoVersionStackId, repositoryVersion.getVersion(),
+      host.getHostId());
 
     hostTransitionStateWriteLock.lock();
     try {
@@ -1576,7 +1576,7 @@ public class ClusterImpl implements Cluster {
         hostVersionDAO.create(hostVersionEntity);
       }
 
-      HostVersionEntity currentVersionEntity = hostVersionDAO.findByHostAndStateCurrent(getClusterName(), host.getHostName());
+      HostVersionEntity currentVersionEntity = hostVersionDAO.findByHostAndStateCurrent(getClusterId(), host.getHostId());
       boolean isCurrentPresent = (currentVersionEntity != null);
       final ServiceComponentHostSummary hostSummary = new ServiceComponentHostSummary(ambariMetaInfo, host, stack);
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/083ac6da/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
index bfb6214..1bd60a8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
@@ -1319,43 +1319,48 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
 
   @Override
   public ServiceComponentHostResponse convertToResponse() {
-    readLock.lock();
+    clusterGlobalLock.readLock().lock();
     try {
-      HostComponentStateEntity hostComponentStateEntity = getStateEntity();
-      if (null == hostComponentStateEntity) {
-        LOG.warn("Could not convert ServiceComponentHostResponse to a response. It's possible that Host " + getHostName() + " was deleted.");
-        return null;
-      }
+      readLock.lock();
+      try {
+        HostComponentStateEntity hostComponentStateEntity = getStateEntity();
+        if (null == hostComponentStateEntity) {
+          LOG.warn("Could not convert ServiceComponentHostResponse to a response. It's possible that Host " + getHostName() + " was deleted.");
+          return null;
+        }
 
-      String clusterName = serviceComponent.getClusterName();
-      String serviceName = serviceComponent.getServiceName();
-      String serviceComponentName = serviceComponent.getName();
-      String hostName = getHostName();
-      String state = getState().toString();
-      String stackId = getStackVersion().getStackId();
-      String desiredState = getDesiredState().toString();
-      String desiredStackId = getDesiredStackVersion().getStackId();
-      HostComponentAdminState componentAdminState = getComponentAdminState();
-      UpgradeState upgradeState = hostComponentStateEntity.getUpgradeState();
-
-      ServiceComponentHostResponse r = new ServiceComponentHostResponse(
-          clusterName, serviceName,
-          serviceComponentName, hostName, state,
-          stackId, desiredState,
-          desiredStackId, componentAdminState);
-
-      r.setActualConfigs(actualConfigs);
-      r.setUpgradeState(upgradeState);
+        String clusterName = serviceComponent.getClusterName();
+        String serviceName = serviceComponent.getServiceName();
+        String serviceComponentName = serviceComponent.getName();
+        String hostName = getHostName();
+        String state = getState().toString();
+        String stackId = getStackVersion().getStackId();
+        String desiredState = getDesiredState().toString();
+        String desiredStackId = getDesiredStackVersion().getStackId();
+        HostComponentAdminState componentAdminState = getComponentAdminState();
+        UpgradeState upgradeState = hostComponentStateEntity.getUpgradeState();
+
+        ServiceComponentHostResponse r = new ServiceComponentHostResponse(
+            clusterName, serviceName,
+            serviceComponentName, hostName, state,
+            stackId, desiredState,
+            desiredStackId, componentAdminState);
+
+        r.setActualConfigs(actualConfigs);
+        r.setUpgradeState(upgradeState);
 
-      try {
-        r.setStaleConfig(helper.isStaleConfigs(this));
-      } catch (Exception e) {
-        LOG.error("Could not determine stale config", e);
-      }
+        try {
+          r.setStaleConfig(helper.isStaleConfigs(this));
+        } catch (Exception e) {
+          LOG.error("Could not determine stale config", e);
+        }
 
-      return r;
+        return r;
+      } finally {
+        readLock.unlock();
+      }
     } finally {
-      readLock.unlock();
+      clusterGlobalLock.readLock().unlock();
     }
   }
 
@@ -1797,6 +1802,7 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
     }
 
     final String hostName = getHostName();
+    final long hostId = getHost().getHostId();
     final Set<Cluster> clustersForHost = clusters.getClustersForHost(hostName);
     if (clustersForHost.size() != 1) {
       throw new AmbariException("Host " + hostName + " should be assigned only to one cluster");
@@ -1815,7 +1821,7 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
         repositoryVersion = createRepositoryVersion(version, stackId, stackInfo);
       }
 
-      final HostEntity host = hostDAO.findByName(hostName);
+      final HostEntity host = hostDAO.findById(hostId);
       cluster.transitionHostVersionState(host, repositoryVersion, stackId);
     } finally {
       writeLock.unlock();