You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2018/06/11 10:25:47 UTC

[ambari] branch trunk updated: AMBARI-24062 Command retry on server when agent-server connection drops. (#1497)

This is an automated email from the ASF dual-hosted git repository.

mpapirkovskyy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1662396  AMBARI-24062 Command retry on server when agent-server connection drops. (#1497)
1662396 is described below

commit 1662396ace59fdd8ae8e882308e76013c2fe7677
Author: Myroslav Papirkovskyi <mp...@apache.org>
AuthorDate: Mon Jun 11 13:25:44 2018 +0300

    AMBARI-24062 Command retry on server when agent-server connection drops. (#1497)
    
    * AMBARI-24062. Command retry on server when agent-server connection drops. (mpapirkovskyy)
    
    * AMBARI-24062. Command retry on server when agent-server connection drops. (mpapirkovskyy)
    
    * AMBARI-24062. Command retry on server when agent-server connection drops. (mpapirkovskyy)
    
    * AMBARI-24062. Command retry on server when agent-server connection drops. (mpapirkovskyy)
    
    * AMBARI-24062. Command retry on server when agent-server connection drops. (mpapirkovskyy)
---
 .../src/main/python/ambari_agent/Constants.py      |   2 +-
 .../ambari/server/agent/HeartBeatHandler.java      |   9 +
 .../ambari/server/agent/HeartbeatMonitor.java      |  83 ++++++---
 .../server/agent/stomp/AgentReportsController.java |  17 ++
 .../stomp/dto/AckReport.java}                      |  68 +++++---
 .../ambari/server/configuration/Configuration.java |  32 +++-
 .../configuration/spring/AgentStompConfig.java     |   8 +
 .../configuration/spring/ApiStompConfig.java       |   3 +-
 .../configuration/spring/RootStompConfig.java      |  17 +-
 .../controller/AmbariManagementController.java     |  20 +++
 .../controller/AmbariManagementControllerImpl.java |  18 +-
 .../apache/ambari/server/events/AmbariEvent.java   |   7 +-
 .../server/events/DefaultMessageEmitter.java       |  52 +++++-
 .../ambari/server/events/HostRegisteredEvent.java  |   9 +-
 .../ambari/server/events/MessageEmitter.java       | 194 ++++++++++++++++++++-
 ...gisteredEvent.java => MessageNotDelivered.java} |  30 +---
 .../apache/ambari/server/events/STOMPEvent.java    |  14 ++
 .../listeners/requests/STOMPUpdateListener.java    |  17 +-
 .../ambari/server/state/cluster/ClustersImpl.java  |   5 -
 .../ambari/server/topology/AmbariContext.java      |  11 +-
 .../topology/ClusterConfigurationRequest.java      |   2 +
 .../apache/ambari/server/state/host/HostTest.java  |   3 +
 22 files changed, 503 insertions(+), 118 deletions(-)

diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py
index 09381eb..ed6b482 100644
--- a/ambari-agent/src/main/python/ambari_agent/Constants.py
+++ b/ambari-agent/src/main/python/ambari_agent/Constants.py
@@ -31,7 +31,7 @@ AGENT_ACTIONS_TOPIC = '/user/agent_actions'
 PRE_REGISTRATION_TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC, AGENT_ACTIONS_TOPIC]
 POST_REGISTRATION_TOPICS_TO_SUBSCRIBE = [COMMANDS_TOPIC]
 
-AGENT_RESPONSES_TOPIC = '/agents/responses'
+AGENT_RESPONSES_TOPIC = '/reports/responses'
 TOPOLOGY_REQUEST_ENDPOINT = '/agents/topologies'
 METADATA_REQUEST_ENDPOINT = '/agents/metadata'
 CONFIGURATIONS_REQUEST_ENDPOINT = '/agents/configs'
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index 918a8fa..7d70390 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -30,6 +30,8 @@ import org.apache.ambari.server.agent.stomp.dto.HostStatusReport;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.events.AgentActionEvent;
+import org.apache.ambari.server.events.HostRegisteredEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.events.publishers.STOMPUpdatePublisher;
 import org.apache.ambari.server.state.AgentVersion;
 import org.apache.ambari.server.state.Cluster;
@@ -96,6 +98,9 @@ public class HeartBeatHandler {
   private AgentSessionManager agentSessionManager;
 
   @Inject
+  private AmbariEventPublisher ambariEventPublisher;
+
+  @Inject
   private AlertHelper alertHelper;
 
   private Map<String, Long> hostResponseIds = new ConcurrentHashMap<>();
@@ -347,6 +352,10 @@ public class HeartBeatHandler {
         new AgentVersion(register.getAgentVersion()), now, register.getHardwareProfile(),
         register.getAgentEnv(), register.getAgentStartTime()));
 
+    // publish the event
+    HostRegisteredEvent event = new HostRegisteredEvent(hostname, hostObject.getHostId());
+    ambariEventPublisher.publish(event);
+
     RegistrationResponse response = new RegistrationResponse();
 
     Long requestId = 0L;
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
index 7961754..5a060f6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
@@ -40,6 +40,8 @@ import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.events.MessageNotDelivered;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.CommandScriptDefinition;
@@ -61,6 +63,7 @@ import org.apache.ambari.server.state.host.HostHeartbeatLostEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.eventbus.Subscribe;
 import com.google.inject.Injector;
 
 /**
@@ -78,6 +81,7 @@ public class HeartbeatMonitor implements Runnable {
   private final AmbariManagementController ambariManagementController;
   private final Configuration configuration;
   private final AgentRequests agentRequests;
+  private final AmbariEventPublisher ambariEventPublisher;
 
   public HeartbeatMonitor(Clusters clusters, ActionManager am,
                           int threadWakeupInterval, Injector injector) {
@@ -90,6 +94,8 @@ public class HeartbeatMonitor implements Runnable {
             AmbariManagementController.class);
     configuration = injector.getInstance(Configuration.class);
     agentRequests = new AgentRequests();
+    ambariEventPublisher = injector.getInstance(AmbariEventPublisher.class);
+    ambariEventPublisher.register(this);
   }
 
   public void shutdown() {
@@ -142,43 +148,17 @@ public class HeartbeatMonitor implements Runnable {
         //do not check if host already known be lost
         continue;
       }
-      String host = hostObj.getHostName();
+      Long hostId = hostObj.getHostId();
       HostState hostState = hostObj.getState();
-      String hostname = hostObj.getHostName();
 
       long lastHeartbeat = 0;
       try {
-        lastHeartbeat = clusters.getHost(host).getLastHeartbeatTime();
+        lastHeartbeat = clusters.getHostById(hostId).getLastHeartbeatTime();
       } catch (AmbariException e) {
         LOG.warn("Exception in getting host object; Is it fatal?", e);
       }
       if (lastHeartbeat + 2 * threadWakeupInterval < now) {
-        LOG.warn("Heartbeat lost from host " + host);
-        //Heartbeat is expired
-        hostObj.handleEvent(new HostHeartbeatLostEvent(host));
-
-        // mark all components that are not clients with unknown status
-        for (Cluster cluster : clusters.getClustersForHost(hostObj.getHostName())) {
-          for (ServiceComponentHost sch : cluster.getServiceComponentHosts(hostObj.getHostName())) {
-            Service s = cluster.getService(sch.getServiceName());
-            ServiceComponent sc = s.getServiceComponent(sch.getServiceComponentName());
-            if (!sc.isClientComponent() &&
-              !sch.getState().equals(State.INIT) &&
-              !sch.getState().equals(State.INSTALLING) &&
-              !sch.getState().equals(State.INSTALL_FAILED) &&
-              !sch.getState().equals(State.UNINSTALLED) &&
-              !sch.getState().equals(State.DISABLED)) {
-              LOG.warn("Setting component state to UNKNOWN for component " + sc.getName() + " on " + host);
-              State oldState = sch.getState();
-              sch.setState(State.UNKNOWN);
-              sch.setLastValidState(oldState);
-            }
-          }
-        }
-
-        //Purge action queue
-        //notify action manager
-        actionManager.handleLostHost(host);
+        handleHeartbeatLost(hostId);
       }
       if (hostState == HostState.WAITING_FOR_HOST_STATUS_UPDATES) {
         long timeSpentInState = hostObj.getTimeInState();
@@ -343,4 +323,49 @@ public class HeartbeatMonitor implements Runnable {
 
     return statusCmd;
   }
+
+  private void handleHeartbeatLost(Long hostId) throws AmbariException, InvalidStateTransitionException {
+    Host hostObj = clusters.getHostById(hostId);
+    String host = hostObj.getHostName();
+    LOG.warn("Heartbeat lost from host " + host);
+    //Heartbeat is expired
+    hostObj.handleEvent(new HostHeartbeatLostEvent(host));
+
+    // mark all components that are not clients with unknown status
+    for (Cluster cluster : clusters.getClustersForHost(hostObj.getHostName())) {
+      for (ServiceComponentHost sch : cluster.getServiceComponentHosts(hostObj.getHostName())) {
+        Service s = cluster.getService(sch.getServiceName());
+        ServiceComponent sc = s.getServiceComponent(sch.getServiceComponentName());
+        if (!sc.isClientComponent() &&
+            !sch.getState().equals(State.INIT) &&
+            !sch.getState().equals(State.INSTALLING) &&
+            !sch.getState().equals(State.INSTALL_FAILED) &&
+            !sch.getState().equals(State.UNINSTALLED) &&
+            !sch.getState().equals(State.DISABLED)) {
+          LOG.warn("Setting component state to UNKNOWN for component " + sc.getName() + " on " + host);
+          State oldState = sch.getState();
+          sch.setState(State.UNKNOWN);
+          sch.setLastValidState(oldState);
+        }
+      }
+    }
+
+    //Purge action queue
+    //notify action manager
+    actionManager.handleLostHost(host);
+  }
+
+  @Subscribe
+  public void onMessageNotDelivered(MessageNotDelivered messageNotDelivered) {
+    try {
+      Host hostObj = clusters.getHostById(messageNotDelivered.getHostId());
+      if (hostObj.getState() == HostState.HEARTBEAT_LOST) {
+        //do not check if host already known be lost
+        return;
+      }
+      handleHeartbeatLost(messageNotDelivered.getHostId());
+    } catch (Exception e) {
+      LOG.error("Error during host to heartbeat lost moving", e);
+    }
+  }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
index 1ba0063..249a141 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
@@ -22,23 +22,28 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import javax.inject.Provider;
 import javax.ws.rs.WebApplicationException;
 
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.HostNotRegisteredException;
 import org.apache.ambari.server.agent.AgentReport;
 import org.apache.ambari.server.agent.AgentReportsProcessor;
 import org.apache.ambari.server.agent.AgentSessionManager;
 import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.agent.ComponentStatus;
 import org.apache.ambari.server.agent.HeartBeatHandler;
+import org.apache.ambari.server.agent.stomp.dto.AckReport;
 import org.apache.ambari.server.agent.stomp.dto.CommandStatusReports;
 import org.apache.ambari.server.agent.stomp.dto.ComponentStatusReport;
 import org.apache.ambari.server.agent.stomp.dto.ComponentStatusReports;
 import org.apache.ambari.server.agent.stomp.dto.HostStatusReport;
+import org.apache.ambari.server.events.DefaultMessageEmitter;
 import org.apache.ambari.server.state.Alert;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.messaging.handler.annotation.Header;
 import org.springframework.messaging.handler.annotation.MessageMapping;
 import org.springframework.messaging.simp.annotation.SendToUser;
@@ -51,6 +56,10 @@ import com.google.inject.Injector;
 @MessageMapping("/reports")
 public class AgentReportsController {
   private static final Logger LOG = LoggerFactory.getLogger(AgentReportsController.class);
+
+  @Autowired
+  private Provider<DefaultMessageEmitter> defaultMessageEmitterProvider;
+
   private final HeartBeatHandler hh;
   private final AgentSessionManager agentSessionManager;
   private final AgentReportsProcessor agentReportsProcessor;
@@ -110,4 +119,12 @@ public class AgentReportsController {
     return new ReportsResponse();
   }
 
+  @MessageMapping("/responses")
+  public ReportsResponse handleReceiveReport(AckReport ackReport) throws HostNotRegisteredException {
+    LOG.debug("Handling agent receive report for execution message with messageId {}, status {}, reason {}",
+        ackReport.getMessageId(), ackReport.getStatus(), ackReport.getReason());
+    defaultMessageEmitterProvider.get().processReceiveReport(ackReport);
+    return new ReportsResponse();
+  }
+
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/HostRegisteredEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/AckReport.java
similarity index 51%
copy from ambari-server/src/main/java/org/apache/ambari/server/events/HostRegisteredEvent.java
copy to ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/AckReport.java
index 66ae977..01a654d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/HostRegisteredEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/AckReport.java
@@ -15,30 +15,50 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ambari.server.events;
+package org.apache.ambari.server.agent.stomp.dto;
 
-/**
- * The {@link HostRegisteredEvent} class is fired when a host registered with
- * the server.
- */
-public class HostRegisteredEvent extends HostEvent {
-  /**
-   * Constructor.
-   *
-   * @param hostName
-   */
-  public HostRegisteredEvent(String hostName) {
-    super(AmbariEventType.HOST_REGISTERED, hostName);
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public String toString() {
-    StringBuilder buffer = new StringBuilder("HostRegistered{ ");
-    buffer.append("hostName=").append(m_hostName);
-    buffer.append("}");
-    return buffer.toString();
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class AckReport {
+
+  @JsonProperty("status")
+  private AckStatus status;
+
+  @JsonProperty("reason")
+  private String reason;
+
+  @JsonProperty("messageId")
+  private Long messageId;
+
+  public AckReport() {
+  }
+
+  public AckStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(AckStatus status) {
+    this.status = status;
+  }
+
+  public String getReason() {
+    return reason;
+  }
+
+  public void setReason(String reason) {
+    this.reason = reason;
+  }
+
+  public Long getMessageId() {
+    return messageId;
+  }
+
+  public void setMessageId(Long messageId) {
+    this.messageId = messageId;
+  }
+
+  public enum AckStatus {
+    OK,
+    ERROR
   }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index 241edc2..8d1f520 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -1866,14 +1866,28 @@ public class Configuration {
    */
   @Markdown(description = "The maximum size of an incoming stomp text message. Default is 2 MB.")
   public static final ConfigurationProperty<Integer> STOMP_MAX_INCOMING_MESSAGE_SIZE = new ConfigurationProperty<>(
-      "stomp.max.message.size", 2*1024*1024);
+      "stomp.max_incoming.message.size", 2*1024*1024);
 
   /**
    * The maximum size of a buffer for stomp message sending. Default is 5 MB.
    */
   @Markdown(description = "The maximum size of a buffer for stomp message sending. Default is 5 MB.")
   public static final ConfigurationProperty<Integer> STOMP_MAX_BUFFER_MESSAGE_SIZE = new ConfigurationProperty<>(
-      "stomp.max.message.size", 5*1024*1024);
+      "stomp.max_buffer.message.size", 5*1024*1024);
+
+  /**
+   * The number of attempts to emit execution command message to agent. Default is 4
+   */
+  @Markdown(description = "The number of attempts to emit execution command message to agent. Default is 4")
+  public static final ConfigurationProperty<Integer> EXECUTION_COMMANDS_RETRY_COUNT = new ConfigurationProperty<>(
+      "execution.command.retry.count", 4);
+
+  /**
+   * The interval in seconds between attempts to emit execution command message to agent. Default is 15
+   */
+  @Markdown(description = "The interval in seconds between attempts to emit execution command message to agent. Default is 15")
+  public static final ConfigurationProperty<Integer> EXECUTION_COMMANDS_RETRY_INTERVAL = new ConfigurationProperty<>(
+      "execution.command.retry.interval", 15);
 
   /**
    * The maximum number of threads used to extract Ambari Views when Ambari
@@ -4598,6 +4612,20 @@ public class Configuration {
   }
 
   /**
+   * @return the number of attempts to emit execution command message to agent. Default is 4
+   */
+  public int getExecutionCommandsRetryCount() {
+    return Integer.parseInt(getProperty(EXECUTION_COMMANDS_RETRY_COUNT));
+  }
+
+  /**
+   * @return the interval in seconds between attempts to emit execution command message to agent. Default is 15
+   */
+  public int getExecutionCommandsRetryInterval() {
+    return Integer.parseInt(getProperty(EXECUTION_COMMANDS_RETRY_INTERVAL));
+  }
+
+  /**
    * @return max thread pool size for agents, default 25
    */
   public int getAgentThreadPoolSize() {
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java
index 7084e8f..e1251d9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java
@@ -21,8 +21,11 @@ import javax.servlet.ServletContext;
 
 import org.apache.ambari.server.agent.stomp.HeartbeatController;
 import org.apache.ambari.server.api.stomp.TestController;
+import org.apache.ambari.server.events.DefaultMessageEmitter;
+import org.apache.ambari.server.events.listeners.requests.STOMPUpdateListener;
 import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Import;
@@ -53,6 +56,11 @@ public class AgentStompConfig extends AbstractWebSocketMessageBrokerConfigurer {
     configuration = injector.getInstance(org.apache.ambari.server.configuration.Configuration.class);
   }
 
+  @Bean
+  public STOMPUpdateListener requestSTOMPListener(Injector injector) {
+    return new STOMPUpdateListener(injector, DefaultMessageEmitter.DEFAULT_AGENT_EVENT_TYPES);
+  }
+
   public DefaultHandshakeHandler getHandshakeHandler() {
     WebSocketServerFactory webSocketServerFactory = new WebSocketServerFactory(servletContext);
     webSocketServerFactory.getPolicy().setMaxTextMessageSize(configuration.getStompMaxIncomingMessageSize());
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java
index 3c699ad..44479ae 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java
@@ -18,6 +18,7 @@
 package org.apache.ambari.server.configuration.spring;
 
 import org.apache.ambari.server.api.stomp.TestController;
+import org.apache.ambari.server.events.DefaultMessageEmitter;
 import org.apache.ambari.server.events.listeners.requests.STOMPUpdateListener;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.ComponentScan;
@@ -46,7 +47,7 @@ public class ApiStompConfig extends AbstractWebSocketMessageBrokerConfigurer {
 
   @Bean
   public STOMPUpdateListener requestSTOMPListener(Injector injector) {
-    return new STOMPUpdateListener(injector);
+    return new STOMPUpdateListener(injector, DefaultMessageEmitter.DEFAULT_API_EVENT_TYPES);
   }
 
   @Override
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java
index 1a3de67..a49df55 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java
@@ -26,7 +26,7 @@ import org.apache.ambari.server.agent.AgentSessionManager;
 import org.apache.ambari.server.agent.stomp.AmbariSubscriptionRegistry;
 import org.apache.ambari.server.api.AmbariSendToMethodReturnValueHandler;
 import org.apache.ambari.server.events.DefaultMessageEmitter;
-import org.apache.ambari.server.events.listeners.requests.STOMPUpdateListener;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,13 +63,14 @@ public class RootStompConfig {
   }
 
   @Bean
-  public STOMPUpdateListener requestSTOMPListener(Injector injector) {
-    return new STOMPUpdateListener(injector);
-  }
-
-  @Bean
-  public DefaultMessageEmitter defaultMessageSender(Injector injector) {
-    return new DefaultMessageEmitter(injector.getInstance(AgentSessionManager.class), brokerTemplate);
+  public DefaultMessageEmitter defaultMessageEmitter(Injector injector) {
+    org.apache.ambari.server.configuration.Configuration configuration =
+        injector.getInstance(org.apache.ambari.server.configuration.Configuration.class);
+    return new DefaultMessageEmitter(injector.getInstance(AgentSessionManager.class),
+        brokerTemplate,
+        injector.getInstance(AmbariEventPublisher.class),
+        configuration.getExecutionCommandsRetryCount(),
+        configuration.getExecutionCommandsRetryInterval());
   }
 
   @Bean
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
index ca13d5b..bf639bd 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
@@ -239,6 +239,26 @@ public interface AmbariManagementController {
       throws AmbariException, AuthorizationException;
 
   /**
+   * Update the cluster identified by the given request object with the
+   * values carried by the given request object.
+   *
+   *
+   * @param requests          request objects which define which cluster to
+   *                          update and the values to set
+   * @param requestProperties request specific properties independent of resource
+   *
+   * @param fireAgentUpdates  should agent updates (configurations, metadata etc.) be fired inside
+   *
+   * @return a track action response
+   *
+   * @throws AmbariException thrown if the resource cannot be updated
+   * @throws AuthorizationException thrown if the authenticated user is not authorized to perform this operation
+   */
+  RequestStatusResponse updateClusters(Set<ClusterRequest> requests,
+                                              Map<String, String> requestProperties, boolean fireAgentUpdates)
+      throws AmbariException, AuthorizationException;
+
+  /**
    * Updates the groups specified.
    *
    * @param requests the groups to modify
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index 3eca387..60c20d4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -1556,13 +1556,22 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
   public synchronized RequestStatusResponse updateClusters(Set<ClusterRequest> requests,
                                                            Map<String, String> requestProperties)
       throws AmbariException, AuthorizationException {
+    return updateClusters(requests, requestProperties, true);
+  }
+
+  @Override
+  @Transactional
+  public synchronized RequestStatusResponse updateClusters(Set<ClusterRequest> requests,
+                                                           Map<String, String> requestProperties,
+                                                           boolean fireAgentUpdates)
+      throws AmbariException, AuthorizationException {
 
     RequestStatusResponse response = null;
 
     // We have to allow for multiple requests to account for multiple
     // configuration updates (create multiple configuration resources)...
     for (ClusterRequest request : requests) {
-      response = updateCluster(request, requestProperties);
+      response = updateCluster(request, requestProperties, fireAgentUpdates);
     }
     return response;
   }
@@ -1661,7 +1670,10 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
     return output;
   }
 
-  private synchronized RequestStatusResponse updateCluster(ClusterRequest request, Map<String, String> requestProperties)
+  private synchronized RequestStatusResponse updateCluster(ClusterRequest request,
+                                                           Map<String, String> requestProperties,
+                                                           boolean fireAgentUpdates
+  )
       throws AmbariException, AuthorizationException {
 
     RequestStageContainer requestStageContainer = null;
@@ -2012,7 +2024,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
         }
       }
     }
-    if (serviceConfigVersionResponse != null || nonServiceConfigsChanged) {
+    if (fireAgentUpdates && (serviceConfigVersionResponse != null || nonServiceConfigsChanged)) {
       configHelper.updateAgentConfigs(Collections.singleton(cluster.getClusterName()));
     }
 
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java
index bb18cc9..1d12377 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java
@@ -190,7 +190,12 @@ public abstract class AmbariEvent {
     /**
      * Service credential store has been enabled or disabled..
      */
-    SERVICE_CREDENTIAL_STORE_UPDATE;
+    SERVICE_CREDENTIAL_STORE_UPDATE,
+
+    /**
+     * Message was not delivered to agent.
+     */
+    MESSAGE_NOT_DELIVERED;
   }
 
   /**
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
index 739e464..f54b743 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
@@ -17,13 +17,18 @@
  */
 package org.apache.ambari.server.events;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.MessageDestinationIsNotDefinedException;
 import org.apache.ambari.server.agent.AgentSessionManager;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.commons.lang.StringUtils;
 import org.springframework.messaging.simp.SimpMessagingTemplate;
 
 public class DefaultMessageEmitter extends MessageEmitter {
@@ -47,22 +52,55 @@ public class DefaultMessageEmitter extends MessageEmitter {
         put(STOMPEvent.Type.UPGRADE, "/events/upgrade");
         put(STOMPEvent.Type.AGENT_ACTIONS, "/agent_actions");
   }});
+  public static final Set<STOMPEvent.Type> DEFAULT_AGENT_EVENT_TYPES =
+      Collections.unmodifiableSet(new HashSet<STOMPEvent.Type>(Arrays.asList(
+        STOMPEvent.Type.METADATA,
+        STOMPEvent.Type.HOSTLEVELPARAMS,
+        STOMPEvent.Type.AGENT_TOPOLOGY,
+        STOMPEvent.Type.AGENT_CONFIGS,
+        STOMPEvent.Type.COMMAND,
+        STOMPEvent.Type.ALERT_DEFINITIONS,
+        STOMPEvent.Type.AGENT_ACTIONS
+  )));
+  public static final Set<STOMPEvent.Type> DEFAULT_API_EVENT_TYPES =
+      Collections.unmodifiableSet(new HashSet<STOMPEvent.Type>(Arrays.asList(
+        STOMPEvent.Type.ALERT,
+        STOMPEvent.Type.ALERT_GROUP,
+        STOMPEvent.Type.METADATA,
+        STOMPEvent.Type.UI_TOPOLOGY,
+        STOMPEvent.Type.CONFIGS,
+        STOMPEvent.Type.HOSTCOMPONENT,
+        STOMPEvent.Type.REQUEST,
+        STOMPEvent.Type.SERVICE,
+        STOMPEvent.Type.HOST,
+        STOMPEvent.Type.UI_ALERT_DEFINITIONS,
+        STOMPEvent.Type.UPGRADE
+  )));
 
-  public DefaultMessageEmitter(AgentSessionManager agentSessionManager, SimpMessagingTemplate simpMessagingTemplate) {
-    super(agentSessionManager, simpMessagingTemplate);
+  public DefaultMessageEmitter(AgentSessionManager agentSessionManager, SimpMessagingTemplate simpMessagingTemplate,
+                               AmbariEventPublisher ambariEventPublisher, int retryCount, int retryInterval) {
+    super(agentSessionManager, simpMessagingTemplate, ambariEventPublisher, retryCount, retryInterval);
   }
 
   @Override
-  public void emitMessage(STOMPEvent event) throws AmbariException {
-    String destination = DEFAULT_DESTINATIONS.get(event.getType());
-    if (destination == null) {
+  public void emitMessage(STOMPEvent event) throws AmbariException, InterruptedException {
+    if (StringUtils.isEmpty(getDestination(event))) {
       throw new MessageDestinationIsNotDefinedException(event.getType());
     }
     if (event instanceof STOMPHostEvent) {
       STOMPHostEvent hostUpdateEvent = (STOMPHostEvent) event;
-      emitMessageToHost(hostUpdateEvent, destination);
+      if (hostUpdateEvent.getType().equals(STOMPEvent.Type.COMMAND)) {
+        emitMessageRetriable((ExecutionCommandEvent) hostUpdateEvent);
+      } else {
+        emitMessageToHost(hostUpdateEvent);
+      }
     } else {
-      emitMessageToAll(event, destination);
+      emitMessageToAll(event);
     }
   }
+
+  @Override
+  protected String getDestination(STOMPEvent stompEvent) {
+    return DEFAULT_DESTINATIONS.get(stompEvent.getType());
+  }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/HostRegisteredEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/HostRegisteredEvent.java
index 66ae977..740a135 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/HostRegisteredEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/HostRegisteredEvent.java
@@ -22,13 +22,20 @@ package org.apache.ambari.server.events;
  * the server.
  */
 public class HostRegisteredEvent extends HostEvent {
+
+  private Long hostId;
   /**
    * Constructor.
    *
    * @param hostName
    */
-  public HostRegisteredEvent(String hostName) {
+  public HostRegisteredEvent(String hostName, Long hostId) {
     super(AmbariEventType.HOST_REGISTERED, hostName);
+    this.hostId = hostId;
+  }
+
+  public Long getHostId() {
+    return hostId;
   }
 
   /**
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java b/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
index cc5f2cd..1483b3f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
@@ -17,9 +17,24 @@
  */
 package org.apache.ambari.server.events;
 
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.HostNotRegisteredException;
 import org.apache.ambari.server.agent.AgentSessionManager;
+import org.apache.ambari.server.agent.stomp.dto.AckReport;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.messaging.MessageHeaders;
@@ -27,18 +42,42 @@ import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
 import org.springframework.messaging.simp.SimpMessageType;
 import org.springframework.messaging.simp.SimpMessagingTemplate;
 
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * Is used to define a strategy for emitting message to subscribers.
  */
 public abstract class MessageEmitter {
   private final static Logger LOG = LoggerFactory.getLogger(MessageEmitter.class);
+
   protected final AgentSessionManager agentSessionManager;
   protected final SimpMessagingTemplate simpMessagingTemplate;
+  private AmbariEventPublisher ambariEventPublisher;
+
+  protected final ScheduledExecutorService emittersExecutor = Executors.newScheduledThreadPool(10,
+      new ThreadFactoryBuilder().setNameFormat("agent-message-emitter-%d").build());
+  protected final ExecutorService monitorsExecutor = Executors.newFixedThreadPool(10,
+      new ThreadFactoryBuilder().setNameFormat("ambari-message-monitor-%d").build());
+
+  protected static final AtomicLong MESSAGE_ID = new AtomicLong(0);
+  protected ConcurrentHashMap<Long, ScheduledFuture> unconfirmedMessages = new ConcurrentHashMap<>();
+  protected ConcurrentHashMap<Long, BlockingQueue<ExecutionCommandEvent>> messagesToEmit = new ConcurrentHashMap<>();
+
+  // is used to cancel agent queue check on unregistering
+  protected ConcurrentHashMap<Long, Future> monitors = new ConcurrentHashMap<>();
 
+  public final int retryCount;
+  public final int retryInterval;
 
-  public MessageEmitter(AgentSessionManager agentSessionManager, SimpMessagingTemplate simpMessagingTemplate) {
+  public MessageEmitter(AgentSessionManager agentSessionManager, SimpMessagingTemplate simpMessagingTemplate,
+                        AmbariEventPublisher ambariEventPublisher, int retryCount, int retryInterval) {
     this.agentSessionManager = agentSessionManager;
     this.simpMessagingTemplate = simpMessagingTemplate;
+    this.ambariEventPublisher = ambariEventPublisher;
+    this.retryCount = retryCount;
+    this.retryInterval = retryInterval;
+    ambariEventPublisher.register(this);
   }
 
   /**
@@ -46,7 +85,113 @@ public abstract class MessageEmitter {
    * @param event message should to be emitted.
    * @throws AmbariException
    */
-  abstract void emitMessage(STOMPEvent event) throws AmbariException;
+  abstract void emitMessage(STOMPEvent event) throws AmbariException, InterruptedException;
+
+  public void emitMessageRetriable(ExecutionCommandEvent event) throws AmbariException, InterruptedException {
+    // set message identifier used to recognize NACK/ACK agent response
+    event.setMessageId(MESSAGE_ID.getAndIncrement());
+
+    Long hostId = event.getHostId();
+    if (!messagesToEmit.containsKey(hostId)) {
+      LOG.error("Trying to emit message to unregistered host with id {}", hostId);
+      return;
+    }
+    messagesToEmit.get(hostId).add(event);
+  }
+
+  private class MessagesToEmitMonitor implements Runnable {
+
+    private final Long hostId;
+
+    public MessagesToEmitMonitor(Long hostId) {
+      this.hostId = hostId;
+    }
+
+    @Override
+    public void run() {
+      while (true) {
+        try {
+          ExecutionCommandEvent event = messagesToEmit.get(hostId).take();
+          EmitMessageTask emitMessageTask = new EmitMessageTask(event);
+          ScheduledFuture scheduledFuture =
+              emittersExecutor.scheduleAtFixedRate(emitMessageTask,
+                  0, retryInterval, TimeUnit.SECONDS);
+          emitMessageTask.setScheduledFuture(scheduledFuture);
+          unconfirmedMessages.put(event.getMessageId(), scheduledFuture);
+
+          scheduledFuture.get();
+        } catch (InterruptedException e) {
+          // can be interrupted when no responses were received from agent and HEARTBEAT_LOST will be fired
+          return;
+        } catch (CancellationException e) {
+          // scheduled tasks can be canceled
+        } catch (ExecutionException e) {
+          LOG.error("Error during preparing command to emit", e);
+          // generate delivery failed event
+          ambariEventPublisher.publish(new MessageNotDelivered(hostId));
+          return;
+        }
+      }
+    }
+  }
+
+  public void processReceiveReport(AckReport ackReport) {
+    Long messageId = ackReport.getMessageId();
+    if (AckReport.AckStatus.OK.equals(ackReport.getStatus())) {
+      if (unconfirmedMessages.containsKey(messageId)) {
+        unconfirmedMessages.get(messageId).cancel(true);
+        unconfirmedMessages.remove(messageId);
+      } else {
+        LOG.warn("OK agent report was received again for already complete command with message id {}", messageId);
+      }
+    } else {
+      LOG.error("Received {} agent report for execution command with messageId {} with following reason: {}",
+          ackReport.getStatus(), messageId, ackReport.getReason());
+    }
+  }
+
+  private class EmitMessageTask implements Runnable {
+
+    private final ExecutionCommandEvent executionCommandEvent;
+    private ScheduledFuture scheduledFuture;
+    private int retry_counter = 0;
+
+    public EmitMessageTask(ExecutionCommandEvent executionCommandEvent) {
+      this.executionCommandEvent = executionCommandEvent;
+    }
+
+    public void setScheduledFuture(ScheduledFuture scheduledFuture) {
+      this.scheduledFuture = scheduledFuture;
+    }
+
+    @Override
+    public void run() {
+      if (retry_counter >= retryCount) {
+        // generate delivery failed event and cancel emitter
+        ambariEventPublisher.publish(new MessageNotDelivered(executionCommandEvent.getHostId()));
+        unconfirmedMessages.remove(executionCommandEvent.getMessageId()); //?
+
+        // remove commands queue for host
+        messagesToEmit.remove(executionCommandEvent.getHostId());
+
+        // cancel retrying to emit command
+        scheduledFuture.cancel(true);
+
+        // cancel checking for new commands for host
+        monitors.get(executionCommandEvent.getHostId()).cancel(true);
+        return;
+      }
+      try {
+        retry_counter++;
+        emitExecutionCommandToHost(executionCommandEvent);
+      } catch (AmbariException e) {
+        LOG.error("Error during emitting execution command with message id {} on attempt {}",
+            executionCommandEvent.getMessageId(), retry_counter, e);
+      }
+    }
+  }
+
+  protected abstract String getDestination(STOMPEvent stompEvent);
 
   /**
    * Creates STOMP message header.
@@ -54,33 +199,66 @@ public abstract class MessageEmitter {
    * @return message header.
    */
   protected MessageHeaders createHeaders(String sessionId) {
+    return createHeaders(sessionId, null);
+  }
+
+  /**
+   * Creates STOMP message header.
+   * @param sessionId
+   * @return message header.
+   */
+  protected MessageHeaders createHeaders(String sessionId, Long messageId) {
     SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
     headerAccessor.setSessionId(sessionId);
     headerAccessor.setLeaveMutable(true);
+    if (messageId != null) {
+      headerAccessor.setNativeHeader("messageId", Long.toString(messageId));
+    }
     return headerAccessor.getMessageHeaders();
   }
 
   /**
    * Emits message to all subscribers.
    * @param event message should to be emitted.
-   * @param destination
    */
-  protected void emitMessageToAll(STOMPEvent event, String destination) {
+  protected void emitMessageToAll(STOMPEvent event) {
     LOG.debug("Received status update event {}", event);
-    simpMessagingTemplate.convertAndSend(destination, event);
+    simpMessagingTemplate.convertAndSend(getDestination(event), event);
   }
 
   /**
    * Emit message to specified host only.
    * @param event message should to be emitted.
-   * @param destination
    * @throws HostNotRegisteredException in case host is not registered.
    */
-  protected void emitMessageToHost(STOMPHostEvent event, String destination) throws HostNotRegisteredException {
+  protected void emitMessageToHost(STOMPHostEvent event) throws HostNotRegisteredException {
     Long hostId = event.getHostId();
     String sessionId = agentSessionManager.getSessionId(hostId);
     LOG.debug("Received status update event {} for host {} registered with session ID {}", event, hostId, sessionId);
     MessageHeaders headers = createHeaders(sessionId);
-    simpMessagingTemplate.convertAndSendToUser(sessionId, destination, event, headers);
+    simpMessagingTemplate.convertAndSendToUser(sessionId, getDestination(event), event, headers);
+  }
+
+  /**
+   * Emit execution command to specified host only.
+   * @param event message should to be emitted.
+   * @throws HostNotRegisteredException in case host is not registered.
+   */
+  protected void emitExecutionCommandToHost(ExecutionCommandEvent event) throws HostNotRegisteredException {
+    Long hostId = event.getHostId();
+    Long messageId = event.getMessageId();
+    String sessionId = agentSessionManager.getSessionId(hostId);
+    LOG.debug("Received status update event {} for host {} registered with session ID {}", event, hostId, sessionId);
+    MessageHeaders headers = createHeaders(sessionId, messageId);
+    simpMessagingTemplate.convertAndSendToUser(sessionId, getDestination(event), event, headers);
+  }
+
+  @Subscribe
+  public void onHostRegister(HostRegisteredEvent hostRegisteredEvent) {
+    Long hostId = hostRegisteredEvent.getHostId();
+    if (!messagesToEmit.containsKey(hostId)) {
+      messagesToEmit.put(hostId, new LinkedBlockingQueue<>());
+      monitors.put(hostId, monitorsExecutor.submit(new MessagesToEmitMonitor(hostId)));
+    }
   }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/HostRegisteredEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/MessageNotDelivered.java
similarity index 60%
copy from ambari-server/src/main/java/org/apache/ambari/server/events/HostRegisteredEvent.java
copy to ambari-server/src/main/java/org/apache/ambari/server/events/MessageNotDelivered.java
index 66ae977..b7696fe 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/HostRegisteredEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/MessageNotDelivered.java
@@ -17,28 +17,16 @@
  */
 package org.apache.ambari.server.events;
 
-/**
- * The {@link HostRegisteredEvent} class is fired when a host registered with
- * the server.
- */
-public class HostRegisteredEvent extends HostEvent {
-  /**
-   * Constructor.
-   *
-   * @param hostName
-   */
-  public HostRegisteredEvent(String hostName) {
-    super(AmbariEventType.HOST_REGISTERED, hostName);
+public class MessageNotDelivered extends AmbariEvent {
+
+  private final Long hostId;
+
+  public MessageNotDelivered(Long hostId) {
+    super(AmbariEventType.MESSAGE_NOT_DELIVERED);
+    this.hostId = hostId;
   }
 
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public String toString() {
-    StringBuilder buffer = new StringBuilder("HostRegistered{ ");
-    buffer.append("hostName=").append(m_hostName);
-    buffer.append("}");
-    return buffer.toString();
+  public Long getHostId() {
+    return hostId;
   }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java
index 15c3b1e..d7e2253 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java
@@ -25,6 +25,11 @@ import java.beans.Transient;
 public abstract class STOMPEvent {
 
   /**
+   * Message id is unique for original messages, but the same for all re-emitted messages.
+   */
+  private Long messageId;
+
+  /**
    * Update type.
    */
   protected final Type type;
@@ -43,6 +48,15 @@ public abstract class STOMPEvent {
     return type.getMetricName();
   }
 
+  @Transient
+  public Long getMessageId() {
+    return messageId;
+  }
+
+  public void setMessageId(Long messageId) {
+    this.messageId = messageId;
+  }
+
   public enum Type {
     ALERT("events.alerts"),
     ALERT_GROUP("events.alert_group"),
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/STOMPUpdateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/STOMPUpdateListener.java
index b5fae5e..fde7854 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/STOMPUpdateListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/STOMPUpdateListener.java
@@ -18,8 +18,10 @@
 
 package org.apache.ambari.server.events.listeners.requests;
 
+import java.util.Collections;
+import java.util.Set;
+
 import org.apache.ambari.server.AmbariException;
-import org.apache.ambari.server.agent.AgentSessionManager;
 import org.apache.ambari.server.events.DefaultMessageEmitter;
 import org.apache.ambari.server.events.STOMPEvent;
 import org.apache.ambari.server.events.publishers.STOMPUpdatePublisher;
@@ -30,21 +32,24 @@ import com.google.common.eventbus.Subscribe;
 import com.google.inject.Injector;
 
 public class STOMPUpdateListener {
-  private final AgentSessionManager agentSessionManager;
 
   @Autowired
   private DefaultMessageEmitter defaultMessageEmitter;
 
-  public STOMPUpdateListener(Injector injector) {
+  private final Set<STOMPEvent.Type> typesToProcess;
+
+  public STOMPUpdateListener(Injector injector, Set<STOMPEvent.Type> typesToProcess) {
     STOMPUpdatePublisher STOMPUpdatePublisher =
       injector.getInstance(STOMPUpdatePublisher.class);
-    agentSessionManager = injector.getInstance(AgentSessionManager.class);
     STOMPUpdatePublisher.register(this);
+    this.typesToProcess = typesToProcess == null ? Collections.emptySet() : typesToProcess;
   }
 
   @Subscribe
   @AllowConcurrentEvents
-  public void onUpdateEvent(STOMPEvent event) throws AmbariException {
-    defaultMessageEmitter.emitMessage(event);
+  public void onUpdateEvent(STOMPEvent event) throws AmbariException, InterruptedException {
+    if (typesToProcess.contains(event.getType())) {
+      defaultMessageEmitter.emitMessage(event);
+    }
   }
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
index 87a9858..a9fbe06 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
@@ -43,7 +43,6 @@ import org.apache.ambari.server.agent.stomp.MetadataHolder;
 import org.apache.ambari.server.agent.stomp.TopologyHolder;
 import org.apache.ambari.server.agent.stomp.dto.TopologyCluster;
 import org.apache.ambari.server.controller.AmbariManagementControllerImpl;
-import org.apache.ambari.server.events.HostRegisteredEvent;
 import org.apache.ambari.server.events.HostsAddedEvent;
 import org.apache.ambari.server.events.HostsRemovedEvent;
 import org.apache.ambari.server.events.TopologyUpdateEvent;
@@ -537,10 +536,6 @@ public class ClustersImpl implements Clusters {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Adding a host to Clusters, hostname={}", hostname);
     }
-
-    // publish the event
-    HostRegisteredEvent event = new HostRegisteredEvent(hostname);
-    eventPublisher.publish(event);
   }
 
   @Override
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java
index 170ad4c..9272e12 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java
@@ -528,7 +528,7 @@ public class AmbariContext {
       RetryHelper.executeWithRetry(new Callable<Object>() {
         @Override
         public Object call() throws Exception {
-          getController().updateClusters(Collections.singleton(clusterRequest), null);
+          getController().updateClusters(Collections.singleton(clusterRequest), null, false);
           return null;
         }
       });
@@ -538,6 +538,15 @@ public class AmbariContext {
     }
   }
 
+  public void notifyAgentsAboutConfigsChanges(String clusterName) {
+    try {
+      configHelper.get().updateAgentConfigs(Collections.singleton(clusterName));
+    } catch (AmbariException e) {
+      LOG.error("Failed to set send agent updates: ", e);
+      throw new RuntimeException("Failed to set send agent updates: " + e, e);
+    }
+  }
+
   /**
    * Verifies that all desired configurations have reached the resolved state
    *   before proceeding with the install
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java
index 92aecb3..985c290 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java
@@ -469,6 +469,8 @@ public class ClusterConfigurationRequest {
       }
     }
 
+    ambariContext.notifyAgentsAboutConfigsChanges(clusterName);
+
     if (tag.equals(TopologyManager.TOPOLOGY_RESOLVED_TAG)) {
       // if this is a request to resolve config, then wait until resolution is completed
       try {
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/host/HostTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/host/HostTest.java
index 1fb61a3..7d4737b 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/host/HostTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/host/HostTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -37,6 +38,7 @@ import org.apache.ambari.server.agent.DiskInfo;
 import org.apache.ambari.server.agent.HeartBeatHandler;
 import org.apache.ambari.server.agent.HostInfo;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.OrmTestHelper;
@@ -129,6 +131,7 @@ public class HostTest {
     ActionManager manager = mock(ActionManager.class);
     Injector injector = mock(Injector.class);
     doNothing().when(injector).injectMembers(any());
+    when(injector.getInstance(AmbariEventPublisher.class)).thenReturn(mock(AmbariEventPublisher.class));
     HeartBeatHandler handler = new HeartBeatHandler(clusters, manager, injector);
     String os = handler.getOsType("RedHat", "6.1");
     Assert.assertEquals("redhat6", os);

-- 
To stop receiving notification emails like this one, please contact
mpapirkovskyy@apache.org.