You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2018/06/11 10:25:47 UTC
[ambari] branch trunk updated: AMBARI-24062 Command retry on server
when agent-server connection drops. (#1497)
This is an automated email from the ASF dual-hosted git repository.
mpapirkovskyy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1662396 AMBARI-24062 Command retry on server when agent-server connection drops. (#1497)
1662396 is described below
commit 1662396ace59fdd8ae8e882308e76013c2fe7677
Author: Myroslav Papirkovskyi <mp...@apache.org>
AuthorDate: Mon Jun 11 13:25:44 2018 +0300
AMBARI-24062 Command retry on server when agent-server connection drops. (#1497)
* AMBARI-24062. Command retry on server when agent-server connection drops. (mpapirkovskyy)
* AMBARI-24062. Command retry on server when agent-server connection drops. (mpapirkovskyy)
* AMBARI-24062. Command retry on server when agent-server connection drops. (mpapirkovskyy)
* AMBARI-24062. Command retry on server when agent-server connection drops. (mpapirkovskyy)
* AMBARI-24062. Command retry on server when agent-server connection drops. (mpapirkovskyy)
---
.../src/main/python/ambari_agent/Constants.py | 2 +-
.../ambari/server/agent/HeartBeatHandler.java | 9 +
.../ambari/server/agent/HeartbeatMonitor.java | 83 ++++++---
.../server/agent/stomp/AgentReportsController.java | 17 ++
.../stomp/dto/AckReport.java} | 68 +++++---
.../ambari/server/configuration/Configuration.java | 32 +++-
.../configuration/spring/AgentStompConfig.java | 8 +
.../configuration/spring/ApiStompConfig.java | 3 +-
.../configuration/spring/RootStompConfig.java | 17 +-
.../controller/AmbariManagementController.java | 20 +++
.../controller/AmbariManagementControllerImpl.java | 18 +-
.../apache/ambari/server/events/AmbariEvent.java | 7 +-
.../server/events/DefaultMessageEmitter.java | 52 +++++-
.../ambari/server/events/HostRegisteredEvent.java | 9 +-
.../ambari/server/events/MessageEmitter.java | 194 ++++++++++++++++++++-
...gisteredEvent.java => MessageNotDelivered.java} | 30 +---
.../apache/ambari/server/events/STOMPEvent.java | 14 ++
.../listeners/requests/STOMPUpdateListener.java | 17 +-
.../ambari/server/state/cluster/ClustersImpl.java | 5 -
.../ambari/server/topology/AmbariContext.java | 11 +-
.../topology/ClusterConfigurationRequest.java | 2 +
.../apache/ambari/server/state/host/HostTest.java | 3 +
22 files changed, 503 insertions(+), 118 deletions(-)
diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py
index 09381eb..ed6b482 100644
--- a/ambari-agent/src/main/python/ambari_agent/Constants.py
+++ b/ambari-agent/src/main/python/ambari_agent/Constants.py
@@ -31,7 +31,7 @@ AGENT_ACTIONS_TOPIC = '/user/agent_actions'
PRE_REGISTRATION_TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC, AGENT_ACTIONS_TOPIC]
POST_REGISTRATION_TOPICS_TO_SUBSCRIBE = [COMMANDS_TOPIC]
-AGENT_RESPONSES_TOPIC = '/agents/responses'
+AGENT_RESPONSES_TOPIC = '/reports/responses'
TOPOLOGY_REQUEST_ENDPOINT = '/agents/topologies'
METADATA_REQUEST_ENDPOINT = '/agents/metadata'
CONFIGURATIONS_REQUEST_ENDPOINT = '/agents/configs'
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index 918a8fa..7d70390 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -30,6 +30,8 @@ import org.apache.ambari.server.agent.stomp.dto.HostStatusReport;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.events.AgentActionEvent;
+import org.apache.ambari.server.events.HostRegisteredEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.events.publishers.STOMPUpdatePublisher;
import org.apache.ambari.server.state.AgentVersion;
import org.apache.ambari.server.state.Cluster;
@@ -96,6 +98,9 @@ public class HeartBeatHandler {
private AgentSessionManager agentSessionManager;
@Inject
+ private AmbariEventPublisher ambariEventPublisher;
+
+ @Inject
private AlertHelper alertHelper;
private Map<String, Long> hostResponseIds = new ConcurrentHashMap<>();
@@ -347,6 +352,10 @@ public class HeartBeatHandler {
new AgentVersion(register.getAgentVersion()), now, register.getHardwareProfile(),
register.getAgentEnv(), register.getAgentStartTime()));
+ // publish the event
+ HostRegisteredEvent event = new HostRegisteredEvent(hostname, hostObject.getHostId());
+ ambariEventPublisher.publish(event);
+
RegistrationResponse response = new RegistrationResponse();
Long requestId = 0L;
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
index 7961754..5a060f6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
@@ -40,6 +40,8 @@ import org.apache.ambari.server.actionmanager.ActionManager;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.events.MessageNotDelivered;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.CommandScriptDefinition;
@@ -61,6 +63,7 @@ import org.apache.ambari.server.state.host.HostHeartbeatLostEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.eventbus.Subscribe;
import com.google.inject.Injector;
/**
@@ -78,6 +81,7 @@ public class HeartbeatMonitor implements Runnable {
private final AmbariManagementController ambariManagementController;
private final Configuration configuration;
private final AgentRequests agentRequests;
+ private final AmbariEventPublisher ambariEventPublisher;
public HeartbeatMonitor(Clusters clusters, ActionManager am,
int threadWakeupInterval, Injector injector) {
@@ -90,6 +94,8 @@ public class HeartbeatMonitor implements Runnable {
AmbariManagementController.class);
configuration = injector.getInstance(Configuration.class);
agentRequests = new AgentRequests();
+ ambariEventPublisher = injector.getInstance(AmbariEventPublisher.class);
+ ambariEventPublisher.register(this);
}
public void shutdown() {
@@ -142,43 +148,17 @@ public class HeartbeatMonitor implements Runnable {
//do not check if host already known be lost
continue;
}
- String host = hostObj.getHostName();
+ Long hostId = hostObj.getHostId();
HostState hostState = hostObj.getState();
- String hostname = hostObj.getHostName();
long lastHeartbeat = 0;
try {
- lastHeartbeat = clusters.getHost(host).getLastHeartbeatTime();
+ lastHeartbeat = clusters.getHostById(hostId).getLastHeartbeatTime();
} catch (AmbariException e) {
LOG.warn("Exception in getting host object; Is it fatal?", e);
}
if (lastHeartbeat + 2 * threadWakeupInterval < now) {
- LOG.warn("Heartbeat lost from host " + host);
- //Heartbeat is expired
- hostObj.handleEvent(new HostHeartbeatLostEvent(host));
-
- // mark all components that are not clients with unknown status
- for (Cluster cluster : clusters.getClustersForHost(hostObj.getHostName())) {
- for (ServiceComponentHost sch : cluster.getServiceComponentHosts(hostObj.getHostName())) {
- Service s = cluster.getService(sch.getServiceName());
- ServiceComponent sc = s.getServiceComponent(sch.getServiceComponentName());
- if (!sc.isClientComponent() &&
- !sch.getState().equals(State.INIT) &&
- !sch.getState().equals(State.INSTALLING) &&
- !sch.getState().equals(State.INSTALL_FAILED) &&
- !sch.getState().equals(State.UNINSTALLED) &&
- !sch.getState().equals(State.DISABLED)) {
- LOG.warn("Setting component state to UNKNOWN for component " + sc.getName() + " on " + host);
- State oldState = sch.getState();
- sch.setState(State.UNKNOWN);
- sch.setLastValidState(oldState);
- }
- }
- }
-
- //Purge action queue
- //notify action manager
- actionManager.handleLostHost(host);
+ handleHeartbeatLost(hostId);
}
if (hostState == HostState.WAITING_FOR_HOST_STATUS_UPDATES) {
long timeSpentInState = hostObj.getTimeInState();
@@ -343,4 +323,49 @@ public class HeartbeatMonitor implements Runnable {
return statusCmd;
}
+
+ private void handleHeartbeatLost(Long hostId) throws AmbariException, InvalidStateTransitionException {
+ Host hostObj = clusters.getHostById(hostId);
+ String host = hostObj.getHostName();
+ LOG.warn("Heartbeat lost from host " + host);
+ //Heartbeat is expired
+ hostObj.handleEvent(new HostHeartbeatLostEvent(host));
+
+ // mark all components that are not clients with unknown status
+ for (Cluster cluster : clusters.getClustersForHost(hostObj.getHostName())) {
+ for (ServiceComponentHost sch : cluster.getServiceComponentHosts(hostObj.getHostName())) {
+ Service s = cluster.getService(sch.getServiceName());
+ ServiceComponent sc = s.getServiceComponent(sch.getServiceComponentName());
+ if (!sc.isClientComponent() &&
+ !sch.getState().equals(State.INIT) &&
+ !sch.getState().equals(State.INSTALLING) &&
+ !sch.getState().equals(State.INSTALL_FAILED) &&
+ !sch.getState().equals(State.UNINSTALLED) &&
+ !sch.getState().equals(State.DISABLED)) {
+ LOG.warn("Setting component state to UNKNOWN for component " + sc.getName() + " on " + host);
+ State oldState = sch.getState();
+ sch.setState(State.UNKNOWN);
+ sch.setLastValidState(oldState);
+ }
+ }
+ }
+
+ //Purge action queue
+ //notify action manager
+ actionManager.handleLostHost(host);
+ }
+
+ @Subscribe
+ public void onMessageNotDelivered(MessageNotDelivered messageNotDelivered) {
+ try {
+ Host hostObj = clusters.getHostById(messageNotDelivered.getHostId());
+ if (hostObj.getState() == HostState.HEARTBEAT_LOST) {
+ //do not check if host already known be lost
+ return;
+ }
+ handleHeartbeatLost(messageNotDelivered.getHostId());
+ } catch (Exception e) {
+ LOG.error("Error during host to heartbeat lost moving", e);
+ }
+ }
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
index 1ba0063..249a141 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
@@ -22,23 +22,28 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import javax.inject.Provider;
import javax.ws.rs.WebApplicationException;
import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.HostNotRegisteredException;
import org.apache.ambari.server.agent.AgentReport;
import org.apache.ambari.server.agent.AgentReportsProcessor;
import org.apache.ambari.server.agent.AgentSessionManager;
import org.apache.ambari.server.agent.CommandReport;
import org.apache.ambari.server.agent.ComponentStatus;
import org.apache.ambari.server.agent.HeartBeatHandler;
+import org.apache.ambari.server.agent.stomp.dto.AckReport;
import org.apache.ambari.server.agent.stomp.dto.CommandStatusReports;
import org.apache.ambari.server.agent.stomp.dto.ComponentStatusReport;
import org.apache.ambari.server.agent.stomp.dto.ComponentStatusReports;
import org.apache.ambari.server.agent.stomp.dto.HostStatusReport;
+import org.apache.ambari.server.events.DefaultMessageEmitter;
import org.apache.ambari.server.state.Alert;
import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.annotation.SendToUser;
@@ -51,6 +56,10 @@ import com.google.inject.Injector;
@MessageMapping("/reports")
public class AgentReportsController {
private static final Logger LOG = LoggerFactory.getLogger(AgentReportsController.class);
+
+ @Autowired
+ private Provider<DefaultMessageEmitter> defaultMessageEmitterProvider;
+
private final HeartBeatHandler hh;
private final AgentSessionManager agentSessionManager;
private final AgentReportsProcessor agentReportsProcessor;
@@ -110,4 +119,12 @@ public class AgentReportsController {
return new ReportsResponse();
}
+ @MessageMapping("/responses")
+ public ReportsResponse handleReceiveReport(AckReport ackReport) throws HostNotRegisteredException {
+ LOG.debug("Handling agent receive report for execution message with messageId {}, status {}, reason {}",
+ ackReport.getMessageId(), ackReport.getStatus(), ackReport.getReason());
+ defaultMessageEmitterProvider.get().processReceiveReport(ackReport);
+ return new ReportsResponse();
+ }
+
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/HostRegisteredEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/AckReport.java
similarity index 51%
copy from ambari-server/src/main/java/org/apache/ambari/server/events/HostRegisteredEvent.java
copy to ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/AckReport.java
index 66ae977..01a654d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/HostRegisteredEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/AckReport.java
@@ -15,30 +15,50 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ambari.server.events;
+package org.apache.ambari.server.agent.stomp.dto;
-/**
- * The {@link HostRegisteredEvent} class is fired when a host registered with
- * the server.
- */
-public class HostRegisteredEvent extends HostEvent {
- /**
- * Constructor.
- *
- * @param hostName
- */
- public HostRegisteredEvent(String hostName) {
- super(AmbariEventType.HOST_REGISTERED, hostName);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public String toString() {
- StringBuilder buffer = new StringBuilder("HostRegistered{ ");
- buffer.append("hostName=").append(m_hostName);
- buffer.append("}");
- return buffer.toString();
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class AckReport {
+
+ @JsonProperty("status")
+ private AckStatus status;
+
+ @JsonProperty("reason")
+ private String reason;
+
+ @JsonProperty("messageId")
+ private Long messageId;
+
+ public AckReport() {
+ }
+
+ public AckStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(AckStatus status) {
+ this.status = status;
+ }
+
+ public String getReason() {
+ return reason;
+ }
+
+ public void setReason(String reason) {
+ this.reason = reason;
+ }
+
+ public Long getMessageId() {
+ return messageId;
+ }
+
+ public void setMessageId(Long messageId) {
+ this.messageId = messageId;
+ }
+
+ public enum AckStatus {
+ OK,
+ ERROR
}
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index 241edc2..8d1f520 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -1866,14 +1866,28 @@ public class Configuration {
*/
@Markdown(description = "The maximum size of an incoming stomp text message. Default is 2 MB.")
public static final ConfigurationProperty<Integer> STOMP_MAX_INCOMING_MESSAGE_SIZE = new ConfigurationProperty<>(
- "stomp.max.message.size", 2*1024*1024);
+ "stomp.max_incoming.message.size", 2*1024*1024);
/**
* The maximum size of a buffer for stomp message sending. Default is 5 MB.
*/
@Markdown(description = "The maximum size of a buffer for stomp message sending. Default is 5 MB.")
public static final ConfigurationProperty<Integer> STOMP_MAX_BUFFER_MESSAGE_SIZE = new ConfigurationProperty<>(
- "stomp.max.message.size", 5*1024*1024);
+ "stomp.max_buffer.message.size", 5*1024*1024);
+
+ /**
+ * The number of attempts to emit execution command message to agent. Default is 4
+ */
+ @Markdown(description = "The number of attempts to emit execution command message to agent. Default is 4")
+ public static final ConfigurationProperty<Integer> EXECUTION_COMMANDS_RETRY_COUNT = new ConfigurationProperty<>(
+ "execution.command.retry.count", 4);
+
+ /**
+ * The interval in seconds between attempts to emit execution command message to agent. Default is 15
+ */
+ @Markdown(description = "The interval in seconds between attempts to emit execution command message to agent. Default is 15")
+ public static final ConfigurationProperty<Integer> EXECUTION_COMMANDS_RETRY_INTERVAL = new ConfigurationProperty<>(
+ "execution.command.retry.interval", 15);
/**
* The maximum number of threads used to extract Ambari Views when Ambari
@@ -4598,6 +4612,20 @@ public class Configuration {
}
/**
+ * @return the number of attempts to emit execution command message to agent. Default is 4
+ */
+ public int getExecutionCommandsRetryCount() {
+ return Integer.parseInt(getProperty(EXECUTION_COMMANDS_RETRY_COUNT));
+ }
+
+ /**
+ * @return the interval in seconds between attempts to emit execution command message to agent. Default is 15
+ */
+ public int getExecutionCommandsRetryInterval() {
+ return Integer.parseInt(getProperty(EXECUTION_COMMANDS_RETRY_INTERVAL));
+ }
+
+ /**
* @return max thread pool size for agents, default 25
*/
public int getAgentThreadPoolSize() {
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java
index 7084e8f..e1251d9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java
@@ -21,8 +21,11 @@ import javax.servlet.ServletContext;
import org.apache.ambari.server.agent.stomp.HeartbeatController;
import org.apache.ambari.server.api.stomp.TestController;
+import org.apache.ambari.server.events.DefaultMessageEmitter;
+import org.apache.ambari.server.events.listeners.requests.STOMPUpdateListener;
import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@@ -53,6 +56,11 @@ public class AgentStompConfig extends AbstractWebSocketMessageBrokerConfigurer {
configuration = injector.getInstance(org.apache.ambari.server.configuration.Configuration.class);
}
+ @Bean
+ public STOMPUpdateListener requestSTOMPListener(Injector injector) {
+ return new STOMPUpdateListener(injector, DefaultMessageEmitter.DEFAULT_AGENT_EVENT_TYPES);
+ }
+
public DefaultHandshakeHandler getHandshakeHandler() {
WebSocketServerFactory webSocketServerFactory = new WebSocketServerFactory(servletContext);
webSocketServerFactory.getPolicy().setMaxTextMessageSize(configuration.getStompMaxIncomingMessageSize());
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java
index 3c699ad..44479ae 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java
@@ -18,6 +18,7 @@
package org.apache.ambari.server.configuration.spring;
import org.apache.ambari.server.api.stomp.TestController;
+import org.apache.ambari.server.events.DefaultMessageEmitter;
import org.apache.ambari.server.events.listeners.requests.STOMPUpdateListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
@@ -46,7 +47,7 @@ public class ApiStompConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Bean
public STOMPUpdateListener requestSTOMPListener(Injector injector) {
- return new STOMPUpdateListener(injector);
+ return new STOMPUpdateListener(injector, DefaultMessageEmitter.DEFAULT_API_EVENT_TYPES);
}
@Override
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java
index 1a3de67..a49df55 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java
@@ -26,7 +26,7 @@ import org.apache.ambari.server.agent.AgentSessionManager;
import org.apache.ambari.server.agent.stomp.AmbariSubscriptionRegistry;
import org.apache.ambari.server.api.AmbariSendToMethodReturnValueHandler;
import org.apache.ambari.server.events.DefaultMessageEmitter;
-import org.apache.ambari.server.events.listeners.requests.STOMPUpdateListener;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,13 +63,14 @@ public class RootStompConfig {
}
@Bean
- public STOMPUpdateListener requestSTOMPListener(Injector injector) {
- return new STOMPUpdateListener(injector);
- }
-
- @Bean
- public DefaultMessageEmitter defaultMessageSender(Injector injector) {
- return new DefaultMessageEmitter(injector.getInstance(AgentSessionManager.class), brokerTemplate);
+ public DefaultMessageEmitter defaultMessageEmitter(Injector injector) {
+ org.apache.ambari.server.configuration.Configuration configuration =
+ injector.getInstance(org.apache.ambari.server.configuration.Configuration.class);
+ return new DefaultMessageEmitter(injector.getInstance(AgentSessionManager.class),
+ brokerTemplate,
+ injector.getInstance(AmbariEventPublisher.class),
+ configuration.getExecutionCommandsRetryCount(),
+ configuration.getExecutionCommandsRetryInterval());
}
@Bean
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
index ca13d5b..bf639bd 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
@@ -239,6 +239,26 @@ public interface AmbariManagementController {
throws AmbariException, AuthorizationException;
/**
+ * Update the cluster identified by the given request object with the
+ * values carried by the given request object.
+ *
+ *
+ * @param requests request objects which define which cluster to
+ * update and the values to set
+ * @param requestProperties request specific properties independent of resource
+ *
+ * @param fireAgentUpdates should agent updates (configurations, metadata etc.) be fired inside
+ *
+ * @return a track action response
+ *
+ * @throws AmbariException thrown if the resource cannot be updated
+ * @throws AuthorizationException thrown if the authenticated user is not authorized to perform this operation
+ */
+ RequestStatusResponse updateClusters(Set<ClusterRequest> requests,
+ Map<String, String> requestProperties, boolean fireAgentUpdates)
+ throws AmbariException, AuthorizationException;
+
+ /**
* Updates the groups specified.
*
* @param requests the groups to modify
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index 3eca387..60c20d4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -1556,13 +1556,22 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
public synchronized RequestStatusResponse updateClusters(Set<ClusterRequest> requests,
Map<String, String> requestProperties)
throws AmbariException, AuthorizationException {
+ return updateClusters(requests, requestProperties, true);
+ }
+
+ @Override
+ @Transactional
+ public synchronized RequestStatusResponse updateClusters(Set<ClusterRequest> requests,
+ Map<String, String> requestProperties,
+ boolean fireAgentUpdates)
+ throws AmbariException, AuthorizationException {
RequestStatusResponse response = null;
// We have to allow for multiple requests to account for multiple
// configuration updates (create multiple configuration resources)...
for (ClusterRequest request : requests) {
- response = updateCluster(request, requestProperties);
+ response = updateCluster(request, requestProperties, fireAgentUpdates);
}
return response;
}
@@ -1661,7 +1670,10 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
return output;
}
- private synchronized RequestStatusResponse updateCluster(ClusterRequest request, Map<String, String> requestProperties)
+ private synchronized RequestStatusResponse updateCluster(ClusterRequest request,
+ Map<String, String> requestProperties,
+ boolean fireAgentUpdates
+ )
throws AmbariException, AuthorizationException {
RequestStageContainer requestStageContainer = null;
@@ -2012,7 +2024,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
}
}
}
- if (serviceConfigVersionResponse != null || nonServiceConfigsChanged) {
+ if (fireAgentUpdates && (serviceConfigVersionResponse != null || nonServiceConfigsChanged)) {
configHelper.updateAgentConfigs(Collections.singleton(cluster.getClusterName()));
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java
index bb18cc9..1d12377 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java
@@ -190,7 +190,12 @@ public abstract class AmbariEvent {
/**
* Service credential store has been enabled or disabled..
*/
- SERVICE_CREDENTIAL_STORE_UPDATE;
+ SERVICE_CREDENTIAL_STORE_UPDATE,
+
+ /**
+ * Message was not delivered to agent.
+ */
+ MESSAGE_NOT_DELIVERED;
}
/**
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
index 739e464..f54b743 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
@@ -17,13 +17,18 @@
*/
package org.apache.ambari.server.events;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.MessageDestinationIsNotDefinedException;
import org.apache.ambari.server.agent.AgentSessionManager;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.commons.lang.StringUtils;
import org.springframework.messaging.simp.SimpMessagingTemplate;
public class DefaultMessageEmitter extends MessageEmitter {
@@ -47,22 +52,55 @@ public class DefaultMessageEmitter extends MessageEmitter {
put(STOMPEvent.Type.UPGRADE, "/events/upgrade");
put(STOMPEvent.Type.AGENT_ACTIONS, "/agent_actions");
}});
+ public static final Set<STOMPEvent.Type> DEFAULT_AGENT_EVENT_TYPES =
+ Collections.unmodifiableSet(new HashSet<STOMPEvent.Type>(Arrays.asList(
+ STOMPEvent.Type.METADATA,
+ STOMPEvent.Type.HOSTLEVELPARAMS,
+ STOMPEvent.Type.AGENT_TOPOLOGY,
+ STOMPEvent.Type.AGENT_CONFIGS,
+ STOMPEvent.Type.COMMAND,
+ STOMPEvent.Type.ALERT_DEFINITIONS,
+ STOMPEvent.Type.AGENT_ACTIONS
+ )));
+ public static final Set<STOMPEvent.Type> DEFAULT_API_EVENT_TYPES =
+ Collections.unmodifiableSet(new HashSet<STOMPEvent.Type>(Arrays.asList(
+ STOMPEvent.Type.ALERT,
+ STOMPEvent.Type.ALERT_GROUP,
+ STOMPEvent.Type.METADATA,
+ STOMPEvent.Type.UI_TOPOLOGY,
+ STOMPEvent.Type.CONFIGS,
+ STOMPEvent.Type.HOSTCOMPONENT,
+ STOMPEvent.Type.REQUEST,
+ STOMPEvent.Type.SERVICE,
+ STOMPEvent.Type.HOST,
+ STOMPEvent.Type.UI_ALERT_DEFINITIONS,
+ STOMPEvent.Type.UPGRADE
+ )));
- public DefaultMessageEmitter(AgentSessionManager agentSessionManager, SimpMessagingTemplate simpMessagingTemplate) {
- super(agentSessionManager, simpMessagingTemplate);
+ public DefaultMessageEmitter(AgentSessionManager agentSessionManager, SimpMessagingTemplate simpMessagingTemplate,
+ AmbariEventPublisher ambariEventPublisher, int retryCount, int retryInterval) {
+ super(agentSessionManager, simpMessagingTemplate, ambariEventPublisher, retryCount, retryInterval);
}
@Override
- public void emitMessage(STOMPEvent event) throws AmbariException {
- String destination = DEFAULT_DESTINATIONS.get(event.getType());
- if (destination == null) {
+ public void emitMessage(STOMPEvent event) throws AmbariException, InterruptedException {
+ if (StringUtils.isEmpty(getDestination(event))) {
throw new MessageDestinationIsNotDefinedException(event.getType());
}
if (event instanceof STOMPHostEvent) {
STOMPHostEvent hostUpdateEvent = (STOMPHostEvent) event;
- emitMessageToHost(hostUpdateEvent, destination);
+ if (hostUpdateEvent.getType().equals(STOMPEvent.Type.COMMAND)) {
+ emitMessageRetriable((ExecutionCommandEvent) hostUpdateEvent);
+ } else {
+ emitMessageToHost(hostUpdateEvent);
+ }
} else {
- emitMessageToAll(event, destination);
+ emitMessageToAll(event);
}
}
+
+ @Override
+ protected String getDestination(STOMPEvent stompEvent) {
+ return DEFAULT_DESTINATIONS.get(stompEvent.getType());
+ }
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/HostRegisteredEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/HostRegisteredEvent.java
index 66ae977..740a135 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/HostRegisteredEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/HostRegisteredEvent.java
@@ -22,13 +22,20 @@ package org.apache.ambari.server.events;
* the server.
*/
public class HostRegisteredEvent extends HostEvent {
+
+ private Long hostId;
/**
* Constructor.
*
* @param hostName
*/
- public HostRegisteredEvent(String hostName) {
+ public HostRegisteredEvent(String hostName, Long hostId) {
super(AmbariEventType.HOST_REGISTERED, hostName);
+ this.hostId = hostId;
+ }
+
+ public Long getHostId() {
+ return hostId;
}
/**
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java b/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
index cc5f2cd..1483b3f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
@@ -17,9 +17,24 @@
*/
package org.apache.ambari.server.events;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.HostNotRegisteredException;
import org.apache.ambari.server.agent.AgentSessionManager;
+import org.apache.ambari.server.agent.stomp.dto.AckReport;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.MessageHeaders;
@@ -27,18 +42,42 @@ import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.SimpMessagingTemplate;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* Is used to define a strategy for emitting message to subscribers.
*/
public abstract class MessageEmitter {
private final static Logger LOG = LoggerFactory.getLogger(MessageEmitter.class);
+
protected final AgentSessionManager agentSessionManager;
protected final SimpMessagingTemplate simpMessagingTemplate;
+ private AmbariEventPublisher ambariEventPublisher;
+
+ protected final ScheduledExecutorService emittersExecutor = Executors.newScheduledThreadPool(10,
+ new ThreadFactoryBuilder().setNameFormat("agent-message-emitter-%d").build());
+ protected final ExecutorService monitorsExecutor = Executors.newFixedThreadPool(10,
+ new ThreadFactoryBuilder().setNameFormat("ambari-message-monitor-%d").build());
+
+ protected static final AtomicLong MESSAGE_ID = new AtomicLong(0);
+ protected ConcurrentHashMap<Long, ScheduledFuture> unconfirmedMessages = new ConcurrentHashMap<>();
+ protected ConcurrentHashMap<Long, BlockingQueue<ExecutionCommandEvent>> messagesToEmit = new ConcurrentHashMap<>();
+
+ // is used to cancel agent queue check on unregistering
+ protected ConcurrentHashMap<Long, Future> monitors = new ConcurrentHashMap<>();
+ public final int retryCount;
+ public final int retryInterval;
- public MessageEmitter(AgentSessionManager agentSessionManager, SimpMessagingTemplate simpMessagingTemplate) {
+ public MessageEmitter(AgentSessionManager agentSessionManager, SimpMessagingTemplate simpMessagingTemplate,
+ AmbariEventPublisher ambariEventPublisher, int retryCount, int retryInterval) {
this.agentSessionManager = agentSessionManager;
this.simpMessagingTemplate = simpMessagingTemplate;
+ this.ambariEventPublisher = ambariEventPublisher;
+ this.retryCount = retryCount;
+ this.retryInterval = retryInterval;
+ ambariEventPublisher.register(this);
}
/**
@@ -46,7 +85,113 @@ public abstract class MessageEmitter {
* @param event message should to be emitted.
* @throws AmbariException
*/
- abstract void emitMessage(STOMPEvent event) throws AmbariException;
+ abstract void emitMessage(STOMPEvent event) throws AmbariException, InterruptedException;
+
+ public void emitMessageRetriable(ExecutionCommandEvent event) throws AmbariException, InterruptedException {
+ // set message identifier used to recognize NACK/ACK agent response
+ event.setMessageId(MESSAGE_ID.getAndIncrement());
+
+ Long hostId = event.getHostId();
+ if (!messagesToEmit.containsKey(hostId)) {
+ LOG.error("Trying to emit message to unregistered host with id {}", hostId);
+ return;
+ }
+ messagesToEmit.get(hostId).add(event);
+ }
+
+ private class MessagesToEmitMonitor implements Runnable {
+
+ private final Long hostId;
+
+ public MessagesToEmitMonitor(Long hostId) {
+ this.hostId = hostId;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ ExecutionCommandEvent event = messagesToEmit.get(hostId).take();
+ EmitMessageTask emitMessageTask = new EmitMessageTask(event);
+ ScheduledFuture scheduledFuture =
+ emittersExecutor.scheduleAtFixedRate(emitMessageTask,
+ 0, retryInterval, TimeUnit.SECONDS);
+ emitMessageTask.setScheduledFuture(scheduledFuture);
+ unconfirmedMessages.put(event.getMessageId(), scheduledFuture);
+
+ scheduledFuture.get();
+ } catch (InterruptedException e) {
+ // can be interrupted when no responses were received from agent and HEARTBEAT_LOST will be fired
+ return;
+ } catch (CancellationException e) {
+ // scheduled tasks can be canceled
+ } catch (ExecutionException e) {
+ LOG.error("Error during preparing command to emit", e);
+ // generate delivery failed event
+ ambariEventPublisher.publish(new MessageNotDelivered(hostId));
+ return;
+ }
+ }
+ }
+ }
+
+ public void processReceiveReport(AckReport ackReport) {
+ Long messageId = ackReport.getMessageId();
+ if (AckReport.AckStatus.OK.equals(ackReport.getStatus())) {
+ if (unconfirmedMessages.containsKey(messageId)) {
+ unconfirmedMessages.get(messageId).cancel(true);
+ unconfirmedMessages.remove(messageId);
+ } else {
+ LOG.warn("OK agent report was received again for already complete command with message id {}", messageId);
+ }
+ } else {
+ LOG.error("Received {} agent report for execution command with messageId {} with following reason: {}",
+ ackReport.getStatus(), messageId, ackReport.getReason());
+ }
+ }
+
+ private class EmitMessageTask implements Runnable {
+
+ private final ExecutionCommandEvent executionCommandEvent;
+ private ScheduledFuture scheduledFuture;
+ private int retry_counter = 0;
+
+ public EmitMessageTask(ExecutionCommandEvent executionCommandEvent) {
+ this.executionCommandEvent = executionCommandEvent;
+ }
+
+ public void setScheduledFuture(ScheduledFuture scheduledFuture) {
+ this.scheduledFuture = scheduledFuture;
+ }
+
+ @Override
+ public void run() {
+ if (retry_counter >= retryCount) {
+ // generate delivery failed event and cancel emitter
+ ambariEventPublisher.publish(new MessageNotDelivered(executionCommandEvent.getHostId()));
+ unconfirmedMessages.remove(executionCommandEvent.getMessageId()); //?
+
+ // remove commands queue for host
+ messagesToEmit.remove(executionCommandEvent.getHostId());
+
+ // cancel retrying to emit command
+ scheduledFuture.cancel(true);
+
+ // cancel checking for new commands for host
+ monitors.get(executionCommandEvent.getHostId()).cancel(true);
+ return;
+ }
+ try {
+ retry_counter++;
+ emitExecutionCommandToHost(executionCommandEvent);
+ } catch (AmbariException e) {
+ LOG.error("Error during emitting execution command with message id {} on attempt {}",
+ executionCommandEvent.getMessageId(), retry_counter, e);
+ }
+ }
+ }
+
+ protected abstract String getDestination(STOMPEvent stompEvent);
/**
* Creates STOMP message header.
@@ -54,33 +199,66 @@ public abstract class MessageEmitter {
* @return message header.
*/
protected MessageHeaders createHeaders(String sessionId) {
+ return createHeaders(sessionId, null);
+ }
+
+ /**
+ * Creates STOMP message header.
+ * @param sessionId
+ * @return message header.
+ */
+ protected MessageHeaders createHeaders(String sessionId, Long messageId) {
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
headerAccessor.setSessionId(sessionId);
headerAccessor.setLeaveMutable(true);
+ if (messageId != null) {
+ headerAccessor.setNativeHeader("messageId", Long.toString(messageId));
+ }
return headerAccessor.getMessageHeaders();
}
/**
* Emits message to all subscribers.
* @param event message should to be emitted.
- * @param destination
*/
- protected void emitMessageToAll(STOMPEvent event, String destination) {
+ protected void emitMessageToAll(STOMPEvent event) {
LOG.debug("Received status update event {}", event);
- simpMessagingTemplate.convertAndSend(destination, event);
+ simpMessagingTemplate.convertAndSend(getDestination(event), event);
}
/**
* Emit message to specified host only.
* @param event message should to be emitted.
- * @param destination
* @throws HostNotRegisteredException in case host is not registered.
*/
- protected void emitMessageToHost(STOMPHostEvent event, String destination) throws HostNotRegisteredException {
+ protected void emitMessageToHost(STOMPHostEvent event) throws HostNotRegisteredException {
Long hostId = event.getHostId();
String sessionId = agentSessionManager.getSessionId(hostId);
LOG.debug("Received status update event {} for host {} registered with session ID {}", event, hostId, sessionId);
MessageHeaders headers = createHeaders(sessionId);
- simpMessagingTemplate.convertAndSendToUser(sessionId, destination, event, headers);
+ simpMessagingTemplate.convertAndSendToUser(sessionId, getDestination(event), event, headers);
+ }
+
+ /**
+ * Emit execution command to specified host only.
+ * @param event message should to be emitted.
+ * @throws HostNotRegisteredException in case host is not registered.
+ */
+ protected void emitExecutionCommandToHost(ExecutionCommandEvent event) throws HostNotRegisteredException {
+ Long hostId = event.getHostId();
+ Long messageId = event.getMessageId();
+ String sessionId = agentSessionManager.getSessionId(hostId);
+ LOG.debug("Received status update event {} for host {} registered with session ID {}", event, hostId, sessionId);
+ MessageHeaders headers = createHeaders(sessionId, messageId);
+ simpMessagingTemplate.convertAndSendToUser(sessionId, getDestination(event), event, headers);
+ }
+
+ @Subscribe
+ public void onHostRegister(HostRegisteredEvent hostRegisteredEvent) {
+ Long hostId = hostRegisteredEvent.getHostId();
+ if (!messagesToEmit.containsKey(hostId)) {
+ messagesToEmit.put(hostId, new LinkedBlockingQueue<>());
+ monitors.put(hostId, monitorsExecutor.submit(new MessagesToEmitMonitor(hostId)));
+ }
}
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/HostRegisteredEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/MessageNotDelivered.java
similarity index 60%
copy from ambari-server/src/main/java/org/apache/ambari/server/events/HostRegisteredEvent.java
copy to ambari-server/src/main/java/org/apache/ambari/server/events/MessageNotDelivered.java
index 66ae977..b7696fe 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/HostRegisteredEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/MessageNotDelivered.java
@@ -17,28 +17,16 @@
*/
package org.apache.ambari.server.events;
-/**
- * The {@link HostRegisteredEvent} class is fired when a host registered with
- * the server.
- */
-public class HostRegisteredEvent extends HostEvent {
- /**
- * Constructor.
- *
- * @param hostName
- */
- public HostRegisteredEvent(String hostName) {
- super(AmbariEventType.HOST_REGISTERED, hostName);
+public class MessageNotDelivered extends AmbariEvent {
+
+ private final Long hostId;
+
+ public MessageNotDelivered(Long hostId) {
+ super(AmbariEventType.MESSAGE_NOT_DELIVERED);
+ this.hostId = hostId;
}
- /**
- * {@inheritDoc}
- */
- @Override
- public String toString() {
- StringBuilder buffer = new StringBuilder("HostRegistered{ ");
- buffer.append("hostName=").append(m_hostName);
- buffer.append("}");
- return buffer.toString();
+ public Long getHostId() {
+ return hostId;
}
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java
index 15c3b1e..d7e2253 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java
@@ -25,6 +25,11 @@ import java.beans.Transient;
public abstract class STOMPEvent {
/**
+ * Message id is unique for original messages, but the same for all re-emitted messages.
+ */
+ private Long messageId;
+
+ /**
* Update type.
*/
protected final Type type;
@@ -43,6 +48,15 @@ public abstract class STOMPEvent {
return type.getMetricName();
}
+ @Transient
+ public Long getMessageId() {
+ return messageId;
+ }
+
+ public void setMessageId(Long messageId) {
+ this.messageId = messageId;
+ }
+
public enum Type {
ALERT("events.alerts"),
ALERT_GROUP("events.alert_group"),
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/STOMPUpdateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/STOMPUpdateListener.java
index b5fae5e..fde7854 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/STOMPUpdateListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/STOMPUpdateListener.java
@@ -18,8 +18,10 @@
package org.apache.ambari.server.events.listeners.requests;
+import java.util.Collections;
+import java.util.Set;
+
import org.apache.ambari.server.AmbariException;
-import org.apache.ambari.server.agent.AgentSessionManager;
import org.apache.ambari.server.events.DefaultMessageEmitter;
import org.apache.ambari.server.events.STOMPEvent;
import org.apache.ambari.server.events.publishers.STOMPUpdatePublisher;
@@ -30,21 +32,24 @@ import com.google.common.eventbus.Subscribe;
import com.google.inject.Injector;
public class STOMPUpdateListener {
- private final AgentSessionManager agentSessionManager;
@Autowired
private DefaultMessageEmitter defaultMessageEmitter;
- public STOMPUpdateListener(Injector injector) {
+ private final Set<STOMPEvent.Type> typesToProcess;
+
+ public STOMPUpdateListener(Injector injector, Set<STOMPEvent.Type> typesToProcess) {
STOMPUpdatePublisher STOMPUpdatePublisher =
injector.getInstance(STOMPUpdatePublisher.class);
- agentSessionManager = injector.getInstance(AgentSessionManager.class);
STOMPUpdatePublisher.register(this);
+ this.typesToProcess = typesToProcess == null ? Collections.emptySet() : typesToProcess;
}
@Subscribe
@AllowConcurrentEvents
- public void onUpdateEvent(STOMPEvent event) throws AmbariException {
- defaultMessageEmitter.emitMessage(event);
+ public void onUpdateEvent(STOMPEvent event) throws AmbariException, InterruptedException {
+ if (typesToProcess.contains(event.getType())) {
+ defaultMessageEmitter.emitMessage(event);
+ }
}
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
index 87a9858..a9fbe06 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
@@ -43,7 +43,6 @@ import org.apache.ambari.server.agent.stomp.MetadataHolder;
import org.apache.ambari.server.agent.stomp.TopologyHolder;
import org.apache.ambari.server.agent.stomp.dto.TopologyCluster;
import org.apache.ambari.server.controller.AmbariManagementControllerImpl;
-import org.apache.ambari.server.events.HostRegisteredEvent;
import org.apache.ambari.server.events.HostsAddedEvent;
import org.apache.ambari.server.events.HostsRemovedEvent;
import org.apache.ambari.server.events.TopologyUpdateEvent;
@@ -537,10 +536,6 @@ public class ClustersImpl implements Clusters {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding a host to Clusters, hostname={}", hostname);
}
-
- // publish the event
- HostRegisteredEvent event = new HostRegisteredEvent(hostname);
- eventPublisher.publish(event);
}
@Override
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java
index 170ad4c..9272e12 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java
@@ -528,7 +528,7 @@ public class AmbariContext {
RetryHelper.executeWithRetry(new Callable<Object>() {
@Override
public Object call() throws Exception {
- getController().updateClusters(Collections.singleton(clusterRequest), null);
+ getController().updateClusters(Collections.singleton(clusterRequest), null, false);
return null;
}
});
@@ -538,6 +538,15 @@ public class AmbariContext {
}
}
+ public void notifyAgentsAboutConfigsChanges(String clusterName) {
+ try {
+ configHelper.get().updateAgentConfigs(Collections.singleton(clusterName));
+ } catch (AmbariException e) {
+ LOG.error("Failed to set send agent updates: ", e);
+ throw new RuntimeException("Failed to set send agent updates: " + e, e);
+ }
+ }
+
/**
* Verifies that all desired configurations have reached the resolved state
* before proceeding with the install
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java
index 92aecb3..985c290 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/ClusterConfigurationRequest.java
@@ -469,6 +469,8 @@ public class ClusterConfigurationRequest {
}
}
+ ambariContext.notifyAgentsAboutConfigsChanges(clusterName);
+
if (tag.equals(TopologyManager.TOPOLOGY_RESOLVED_TAG)) {
// if this is a request to resolve config, then wait until resolution is completed
try {
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/host/HostTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/host/HostTest.java
index 1fb61a3..7d4737b 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/host/HostTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/host/HostTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -37,6 +38,7 @@ import org.apache.ambari.server.agent.DiskInfo;
import org.apache.ambari.server.agent.HeartBeatHandler;
import org.apache.ambari.server.agent.HostInfo;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
import org.apache.ambari.server.orm.OrmTestHelper;
@@ -129,6 +131,7 @@ public class HostTest {
ActionManager manager = mock(ActionManager.class);
Injector injector = mock(Injector.class);
doNothing().when(injector).injectMembers(any());
+ when(injector.getInstance(AmbariEventPublisher.class)).thenReturn(mock(AmbariEventPublisher.class));
HeartBeatHandler handler = new HeartBeatHandler(clusters, manager, injector);
String os = handler.getOsType("RedHat", "6.1");
Assert.assertEquals("redhat6", os);
--
To stop receiving notification emails like this one, please contact
mpapirkovskyy@apache.org.