You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2017/10/05 10:46:39 UTC
[4/4] ambari git commit: AMBARI-22063. Poor performance of STOMP
subscriptions cache and registration handling. (mpapirkovskyy)
AMBARI-22063. Poor performance of STOMP subscriptions cache and registration handling. (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6ecac18c
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6ecac18c
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6ecac18c
Branch: refs/heads/branch-3.0-perf
Commit: 6ecac18cb5668ec3b74fe69751dc54d65f33f698
Parents: f0def7c
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Thu Oct 5 13:45:13 2017 +0300
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Thu Oct 5 13:45:13 2017 +0300
----------------------------------------------------------------------
.../server/HostNotRegisteredException.java | 4 +-
.../server/actionmanager/ActionScheduler.java | 20 +-
.../apache/ambari/server/agent/AgentReport.java | 53 ++
.../server/agent/AgentReportsProcessor.java | 100 ++++
.../server/agent/AgentSessionManager.java | 20 +-
.../ambari/server/agent/HeartBeatHandler.java | 121 +----
.../ambari/server/agent/HeartbeatMonitor.java | 2 +
.../ambari/server/agent/HeartbeatProcessor.java | 49 +-
.../agent/stomp/AgentClusterDataHolder.java | 27 +-
.../server/agent/stomp/AgentConfigsHolder.java | 20 +-
.../agent/stomp/AgentCurrentDataController.java | 19 +-
.../server/agent/stomp/AgentHostDataHolder.java | 40 +-
.../agent/stomp/AgentReportsController.java | 24 +-
.../agent/stomp/AgentsRegistrationQueue.java | 76 +++
.../agent/stomp/AlertDefinitionsHolder.java | 40 +-
.../agent/stomp/AmbariSubscriptionRegistry.java | 536 +++++++++++++++++++
.../server/agent/stomp/HeartbeatController.java | 150 ++++--
.../agent/stomp/HostLevelParamsHolder.java | 10 +-
.../server/agent/stomp/MetadataHolder.java | 6 +-
.../server/agent/stomp/dto/ClusterConfigs.java | 16 +-
.../ambari/server/agent/stomp/dto/Hash.java | 6 +-
.../server/agent/stomp/dto/MetadataCluster.java | 6 +-
.../agent/stomp/dto/MetadataServiceInfo.java | 6 +-
.../server/agent/stomp/dto/TopologyCluster.java | 6 +-
.../agent/stomp/dto/TopologyComponent.java | 6 +-
.../server/agent/stomp/dto/TopologyHost.java | 6 +-
.../server/configuration/Configuration.java | 86 +++
.../spring/AgentRegisteringQueueChecker.java | 55 ++
.../configuration/spring/AgentStompConfig.java | 7 +-
.../configuration/spring/GuiceBeansConfig.java | 12 +
.../configuration/spring/RootStompConfig.java | 35 +-
.../controller/AmbariManagementController.java | 3 +
.../AmbariManagementControllerImpl.java | 49 +-
.../ambari/server/controller/AmbariServer.java | 13 +-
.../internal/HostResourceProvider.java | 8 +-
.../state/DefaultServiceCalculatedState.java | 2 +-
.../state/FlumeServiceCalculatedState.java | 2 +-
.../state/HBaseServiceCalculatedState.java | 2 +-
.../state/HDFSServiceCalculatedState.java | 2 +-
.../state/HiveServiceCalculatedState.java | 2 +-
.../state/OozieServiceCalculatedState.java | 2 +-
.../state/YARNServiceCalculatedState.java | 2 +-
.../server/events/AgentConfigsUpdateEvent.java | 20 +-
.../events/AlertDefinitionsUpdateEvent.java | 12 +-
.../server/events/AmbariHostUpdateEvent.java | 6 +-
.../server/events/ConfigsUpdateEvent.java | 39 --
.../server/events/ExecutionCommandEvent.java | 26 +-
.../events/HostLevelParamsUpdateEvent.java | 14 +-
.../server/events/ServiceUpdateEvent.java | 6 +-
.../listeners/requests/StateUpdateListener.java | 6 +-
.../services/ServiceUpdateListener.java | 29 +-
.../listeners/tasks/TaskStatusListener.java | 33 +-
.../publishers/AgentCommandsPublisher.java | 48 +-
.../BufferedUpdateEventPublisher.java | 73 +++
.../HostComponentUpdateEventPublisher.java | 41 +-
.../publishers/ServiceUpdateEventPublisher.java | 68 +++
.../publishers/StateUpdateEventPublisher.java | 11 +-
.../orm/dao/HostComponentDesiredStateDAO.java | 34 +-
.../dao/ServiceComponentDesiredStateDAO.java | 37 ++
.../HostComponentDesiredStateEntity.java | 8 +-
.../orm/entities/HostComponentStateEntity.java | 18 +
.../ambari/server/state/ConfigHelper.java | 40 +-
.../org/apache/ambari/server/state/Host.java | 8 +
.../server/state/ServiceComponentHost.java | 6 +
.../state/ServiceComponentHostFactory.java | 4 +
.../server/state/ServiceComponentImpl.java | 19 +-
.../server/state/alert/AlertDefinitionHash.java | 3 +-
.../server/state/cluster/ClusterImpl.java | 24 +-
.../ambari/server/state/host/HostImpl.java | 115 +++-
.../svccomphost/ServiceComponentHostImpl.java | 84 ++-
.../server/topology/TopologyDeleteFormer.java | 7 +-
.../server/upgrade/UpgradeCatalog300.java | 29 +
.../resources/Ambari-DDL-AzureDB-CREATE.sql | 1 +
.../main/resources/Ambari-DDL-Derby-CREATE.sql | 1 +
.../main/resources/Ambari-DDL-MySQL-CREATE.sql | 1 +
.../main/resources/Ambari-DDL-Oracle-CREATE.sql | 1 +
.../resources/Ambari-DDL-Postgres-CREATE.sql | 1 +
.../resources/Ambari-DDL-SQLAnywhere-CREATE.sql | 1 +
.../resources/Ambari-DDL-SQLServer-CREATE.sql | 1 +
.../actionmanager/TestActionScheduler.java | 142 +++--
.../server/agent/AgentSessionManagerTest.java | 25 +-
.../listeners/tasks/TaskStatusListenerTest.java | 23 +-
.../server/upgrade/UpgradeCatalog300Test.java | 11 +
83 files changed, 2066 insertions(+), 661 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/HostNotRegisteredException.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/HostNotRegisteredException.java b/ambari-server/src/main/java/org/apache/ambari/server/HostNotRegisteredException.java
index eadd5f1..82d42cd 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/HostNotRegisteredException.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/HostNotRegisteredException.java
@@ -27,8 +27,8 @@ public class HostNotRegisteredException extends AmbariException {
return new HostNotRegisteredException(String.format("Host with sessionId '%s' not registered", sessionId));
}
- public static HostNotRegisteredException forHostName(String hostName) {
- return new HostNotRegisteredException(String.format("Host with hostName '%s' not registered", hostName));
+ public static HostNotRegisteredException forHostId(Long hostId) {
+ return new HostNotRegisteredException(String.format("Host with hostId '%s' not registered", hostId));
}
private HostNotRegisteredException(String message) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 595edcd..c41dd01 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -242,7 +242,7 @@ class ActionScheduler implements Runnable {
UnitOfWork unitOfWork, AmbariEventPublisher ambariEventPublisher,
Configuration configuration, Provider<EntityManager> entityManagerProvider,
HostRoleCommandDAO hostRoleCommandDAO, HostRoleCommandFactory hostRoleCommandFactory,
- RoleCommandOrderProvider roleCommandOrderProvider) {
+ RoleCommandOrderProvider roleCommandOrderProvider, AgentCommandsPublisher agentCommandsPublisher) {
sleepTime = sleepTimeMilliSec;
actionTimeout = actionTimeoutMilliSec;
@@ -259,6 +259,7 @@ class ActionScheduler implements Runnable {
this.hostRoleCommandFactory = hostRoleCommandFactory;
jpaPublisher = null;
this.roleCommandOrderProvider = roleCommandOrderProvider;
+ this.agentCommandsPublisher = agentCommandsPublisher;
serverActionExecutor = new ServerActionExecutor(db, sleepTime);
initializeCaches();
@@ -284,11 +285,12 @@ class ActionScheduler implements Runnable {
ActionQueue actionQueue, Clusters fsmObject, int maxAttempts, HostsMap hostsMap,
UnitOfWork unitOfWork, AmbariEventPublisher ambariEventPublisher,
Configuration configuration, Provider<EntityManager> entityManagerProvider,
- HostRoleCommandDAO hostRoleCommandDAO, HostRoleCommandFactory hostRoleCommandFactory) {
+ HostRoleCommandDAO hostRoleCommandDAO, HostRoleCommandFactory hostRoleCommandFactory,
+ AgentCommandsPublisher agentCommandsPublisher) {
this(sleepTimeMilliSec, actionTimeoutMilliSec, db, actionQueue, fsmObject, maxAttempts, hostsMap, unitOfWork,
ambariEventPublisher, configuration, entityManagerProvider, hostRoleCommandDAO, hostRoleCommandFactory,
- null);
+ null, agentCommandsPublisher);
}
/**
@@ -456,7 +458,7 @@ class ActionScheduler implements Runnable {
// Commands that will be scheduled in current scheduler wakeup
List<ExecutionCommand> commandsToSchedule = new ArrayList<>();
- Multimap<String, AgentCommand> commandsToEnqueue = ArrayListMultimap.create();
+ Multimap<Long, AgentCommand> commandsToEnqueue = ArrayListMultimap.create();
Map<String, RoleStats> roleStats =
processInProgressStage(stage, commandsToSchedule, commandsToEnqueue);
@@ -559,7 +561,7 @@ class ActionScheduler implements Runnable {
if (Role.AMBARI_SERVER_ACTION.name().equals(cmd.getRole())) {
serverActionExecutor.awake();
} else {
- commandsToEnqueue.put(cmd.getHostname(), cmd);
+ commandsToEnqueue.put(clusters.getHost(cmd.getHostname()).getHostId(), cmd);
}
}
agentCommandsPublisher.sendAgentCommand(commandsToEnqueue);
@@ -746,7 +748,7 @@ class ActionScheduler implements Runnable {
* whether stage has succeeded or failed
*/
protected Map<String, RoleStats> processInProgressStage(Stage s, List<ExecutionCommand> commandsToSchedule,
- Multimap<String, AgentCommand> commandsToEnqueue) throws AmbariException {
+ Multimap<Long, AgentCommand> commandsToEnqueue) throws AmbariException {
LOG.debug("==> Collecting commands to schedule...");
// Map to track role status
Map<String, RoleStats> roleStats = initRoleStats(s);
@@ -1274,7 +1276,7 @@ class ActionScheduler implements Runnable {
CancelCommand cancelCommand = new CancelCommand();
cancelCommand.setTargetTaskId(hostRoleCommand.getTaskId());
cancelCommand.setReason(reason);
- agentCommandsPublisher.sendAgentCommand(hostRoleCommand.getHostName(), cancelCommand);
+ agentCommandsPublisher.sendAgentCommand(hostRoleCommand.getHostId(), cancelCommand);
}
}
@@ -1294,7 +1296,7 @@ class ActionScheduler implements Runnable {
}
}
- void cancelCommandOnTimeout(Collection<HostRoleCommand> hostRoleCommands, Multimap<String, AgentCommand> commandsToEnqueue) {
+ void cancelCommandOnTimeout(Collection<HostRoleCommand> hostRoleCommands, Multimap<Long, AgentCommand> commandsToEnqueue) {
for (HostRoleCommand hostRoleCommand : hostRoleCommands) {
// There are no server actions in actionQueue
if (!Role.AMBARI_SERVER_ACTION.equals(hostRoleCommand.getRole())) {
@@ -1303,7 +1305,7 @@ class ActionScheduler implements Runnable {
CancelCommand cancelCommand = new CancelCommand();
cancelCommand.setTargetTaskId(hostRoleCommand.getTaskId());
cancelCommand.setReason("Stage timeout");
- commandsToEnqueue.put(hostRoleCommand.getHostName(), cancelCommand);
+ commandsToEnqueue.put(hostRoleCommand.getHostId(), cancelCommand);
}
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java
new file mode 100644
index 0000000..817a238
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReport.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+import java.util.List;
+
+import org.apache.ambari.server.agent.stomp.dto.HostStatusReport;
+
+public class AgentReport {
+
+ private String hostName;
+ private List<ComponentStatus> componentStatuses;
+ private List<CommandReport> reports;
+ private HostStatusReport hostStatusReport;
+
+ public AgentReport(String hostName, List<ComponentStatus> componentStatuses, List<CommandReport> reports, HostStatusReport hostStatusReport) {
+ this.hostName = hostName;
+ this.componentStatuses = componentStatuses;
+ this.reports = reports;
+ this.hostStatusReport = hostStatusReport;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public List<ComponentStatus> getComponentStatuses() {
+ return componentStatuses;
+ }
+
+ public List<CommandReport> getCommandReports() {
+ return reports;
+ }
+
+ public HostStatusReport getHostStatusReport() {
+ return hostStatusReport;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java
new file mode 100644
index 0000000..586a16e
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentReportsProcessor.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.google.inject.persist.UnitOfWork;
+
+@Singleton
+public class AgentReportsProcessor {
+ private static final Logger LOG = LoggerFactory.getLogger(AgentReportsProcessor.class);
+
+ private ScheduledExecutorService executor;
+
+ private ConcurrentLinkedQueue<AgentReport> agentReportsQueue = new ConcurrentLinkedQueue<>();
+
+ public void addAgentReport(AgentReport agentReport) {
+ agentReportsQueue.add(agentReport);
+ }
+
+ @Inject
+ private HeartBeatHandler hh;
+
+ @Inject
+ private UnitOfWork unitOfWork;
+
+ @Inject
+ private Configuration configuration;
+
+ public AgentReportsProcessor() {
+
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("agent-report-processor-%d").build();
+ int poolSize = configuration.getAgentsReportThreadPoolSize();
+ executor = Executors.newScheduledThreadPool(poolSize, threadFactory);
+ for (int i=0; i< poolSize; i++) {
+ executor.scheduleAtFixedRate(new AgentReportProcessingTask(),
+ configuration.getAgentsReportProcessingStartTimeout(),
+ configuration.getAgentsReportProcessingPeriod(), TimeUnit.SECONDS);
+ }
+ }
+
+ private class AgentReportProcessingTask implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ unitOfWork.begin();
+ while (true) {
+ AgentReport agentReport = agentReportsQueue.poll();
+ if (agentReport == null) {
+ break;
+ }
+ String hostName = agentReport.getHostName();
+ try {
+
+ //TODO rewrite with polymorphism usage.
+ if (agentReport.getCommandReports() != null) {
+ hh.handleCommandReportStatus(agentReport.getCommandReports(), hostName);
+ } else if (agentReport.getComponentStatuses() != null) {
+ hh.handleComponentReportStatus(agentReport.getComponentStatuses(), hostName);
+ } else if (agentReport.getHostStatusReport() != null) {
+ hh.handleHostReportStatus(agentReport.getHostStatusReport(), hostName);
+ }
+ } catch (AmbariException e) {
+ LOG.error("Error processing agent reports", e);
+ }
+ }
+ } finally {
+ unitOfWork.end();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java
index 3040f55..2f435bb 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java
@@ -30,14 +30,14 @@ import com.google.inject.Singleton;
public class AgentSessionManager {
private final ConcurrentMap<String, Host> registeredHosts = new ConcurrentHashMap<>(); // session ID -> host
- private final ConcurrentMap<String, String> registeredSessionIds = new ConcurrentHashMap<>(); // hostname -> session ID
+ private final ConcurrentMap<Long, String> registeredSessionIds = new ConcurrentHashMap<>();
public void register(String sessionId, Host host) {
Preconditions.checkNotNull(sessionId);
Preconditions.checkNotNull(host);
- Preconditions.checkNotNull(host.getHostName());
+ Preconditions.checkNotNull(host.getHostId());
- String oldSessionId = registeredSessionIds.put(host.getHostName(), sessionId);
+ String oldSessionId = registeredSessionIds.put(host.getHostId(), sessionId);
if (oldSessionId != null) {
registeredHosts.remove(oldSessionId);
}
@@ -59,21 +59,21 @@ public class AgentSessionManager {
throw HostNotRegisteredException.forSessionId(sessionId);
}
- public String getSessionId(String hostName) throws HostNotRegisteredException {
- Preconditions.checkNotNull(hostName);
+ public String getSessionId(Long hostId) throws HostNotRegisteredException {
+ Preconditions.checkNotNull(hostId);
- String sessionId = registeredSessionIds.get(hostName);
+ String sessionId = registeredSessionIds.get(hostId);
if (sessionId != null) {
return sessionId;
}
- throw HostNotRegisteredException.forHostName(hostName);
+ throw HostNotRegisteredException.forHostId(hostId);
}
- public void unregisterByHost(String hostName) {
- Preconditions.checkNotNull(hostName);
+ public void unregisterByHost(Long hostId) {
+ Preconditions.checkNotNull(hostId);
- String sessionId = registeredSessionIds.remove(hostName);
+ String sessionId = registeredSessionIds.remove(hostId);
if (sessionId != null) {
registeredHosts.remove(sessionId);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index 00d469f..ee6e05c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -17,11 +17,9 @@
*/
package org.apache.ambari.server.agent;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
@@ -42,7 +40,6 @@ import org.apache.ambari.server.state.HostState;
import org.apache.ambari.server.state.ServiceComponent;
import org.apache.ambari.server.state.ServiceComponentHost;
import org.apache.ambari.server.state.StackId;
-import org.apache.ambari.server.state.alert.AlertDefinition;
import org.apache.ambari.server.state.alert.AlertDefinitionHash;
import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
import org.apache.ambari.server.state.host.HostHealthyHeartbeatEvent;
@@ -190,6 +187,15 @@ public class HeartBeatHandler {
hostResponseIds.put(hostname, currentResponseId);
hostResponses.put(hostname, response);
+ // If the host is waiting for component status updates, notify it
+ if (hostObject.getState().equals(HostState.WAITING_FOR_HOST_STATUS_UPDATES)) {
+ try {
+ LOG.debug("Got component status updates for host {}", hostname);
+ hostObject.handleEvent(new HostStatusUpdatesReceivedEvent(hostname, now));
+ } catch (InvalidStateTransitionException e) {
+ LOG.warn("Failed to notify the host {} about component status updates", hostname, e);
+ }
+ }
if (heartbeat.getRecoveryReport() != null) {
RecoveryReport rr = heartbeat.getRecoveryReport();
processRecoveryReport(rr, hostname);
@@ -204,27 +210,6 @@ public class HeartBeatHandler {
return createRegisterCommand();
}
- /*
- * A host can belong to only one cluster. Though getClustersForHost(hostname)
- * returns a set of clusters, it will have only one entry.
- *
- * TODO: Handle the case when a host is a part of multiple clusters.
- */
- Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
-
- if (clusters.size() > 0) {
- String clusterName = clusters.iterator().next().getClusterName();
-
- if (recoveryConfigHelper.isConfigStale(clusterName, hostname, heartbeat.getRecoveryTimestamp())) {
- RecoveryConfig rc = recoveryConfigHelper.getRecoveryConfig(clusterName, hostname);
- response.setRecoveryConfig(rc);
-
- if (response.getRecoveryConfig() != null) {
- LOG.debug("Recovery configuration set to {}", response.getRecoveryConfig().toString());
- }
- }
- }
-
heartbeatProcessor.addHeartbeat(heartbeat);
// Send commands if node is active
@@ -253,7 +238,7 @@ public class HeartBeatHandler {
} catch (InvalidStateTransitionException ex) {
LOG.warn("Asking agent to re-register due to " + ex.getMessage(), ex);
host.setState(HostState.INIT);
- agentSessionManager.unregisterByHost(hostname);
+ agentSessionManager.unregisterByHost(host.getHostId());
}
}
@@ -338,19 +323,11 @@ public class HeartBeatHandler {
}
// Resetting host state
- hostObject.setState(HostState.INIT);
+ hostObject.setStateMachineState(HostState.INIT);
// Set ping port for agent
hostObject.setCurrentPingPort(currentPingPort);
- // Get status of service components
- List<StatusCommand> cmds = heartbeatMonitor.generateStatusCommands(hostname);
-
- // Add request for component version
- for (StatusCommand command: cmds) {
- command.getCommandParams().put("request_version", String.valueOf(true));
- }
-
// Save the prefix of the log file paths
hostObject.setPrefix(register.getPrefix());
@@ -360,43 +337,6 @@ public class HeartBeatHandler {
register.getAgentEnv()));
RegistrationResponse response = new RegistrationResponse();
- if (cmds.isEmpty()) {
- //No status commands needed let the fsm know that status step is done
- hostObject.handleEvent(new HostStatusUpdatesReceivedEvent(hostname,
- now));
- }
-
- response.setStatusCommands(cmds);
-
- response.setResponseStatus(RegistrationStatus.OK);
-
- // force the registering agent host to receive its list of alert definitions
- List<AlertDefinitionCommand> alertDefinitionCommands = getRegistrationAlertDefinitionCommands(hostname);
- response.setAlertDefinitionCommands(alertDefinitionCommands);
-
- response.setAgentConfig(config.getAgentConfigsMap());
- if(response.getAgentConfig() != null) {
- LOG.debug("Agent configuration map set to {}", response.getAgentConfig());
- }
-
- /*
- * A host can belong to only one cluster. Though getClustersForHost(hostname)
- * returns a set of clusters, it will have only one entry.
- *
- * TODO: Handle the case when a host is a part of multiple clusters.
- */
- Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
-
- if (clusters.size() > 0) {
- String clusterName = clusters.iterator().next().getClusterName();
-
- RecoveryConfig rc = recoveryConfigHelper.getRecoveryConfig(clusterName, hostname);
- response.setRecoveryConfig(rc);
-
- if(response.getRecoveryConfig() != null) {
- LOG.info("Recovery configuration set to " + response.getRecoveryConfig());
- }
- }
Long requestId = 0L;
hostResponseIds.put(hostname, requestId);
@@ -463,45 +403,6 @@ public class HeartBeatHandler {
return response;
}
- /**
- * Gets the {@link AlertDefinitionCommand} instances that need to be sent for
- * each cluster that the registering host is a member of.
- *
- * @param hostname
- * @return
- * @throws AmbariException
- */
- private List<AlertDefinitionCommand> getRegistrationAlertDefinitionCommands(
- String hostname) throws AmbariException {
-
- Set<Cluster> hostClusters = clusterFsm.getClustersForHost(hostname);
- if (null == hostClusters || hostClusters.size() == 0) {
- return null;
- }
-
- List<AlertDefinitionCommand> commands = new ArrayList<>();
-
- // for every cluster this host is a member of, build the command
- for (Cluster cluster : hostClusters) {
- String clusterName = cluster.getClusterName();
- alertDefinitionHash.invalidate(clusterName, hostname);
-
- List<AlertDefinition> definitions = alertDefinitionHash.getAlertDefinitions(
- clusterName, hostname);
-
- String hash = alertDefinitionHash.getHash(clusterName, hostname);
- Host host = cluster.getHost(hostname);
- String publicHostName = host == null? hostname : host.getPublicHostName();
- AlertDefinitionCommand command = new AlertDefinitionCommand(clusterName,
- hostname, publicHostName, hash, definitions);
-
- command.addConfigs(configHelper, cluster);
- commands.add(command);
- }
-
- return commands;
- }
-
public void stop() {
heartbeatMonitor.shutdown();
heartbeatProcessor.stopAsync();
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
index 29db219..c5caf85 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
@@ -171,7 +171,9 @@ public class HeartbeatMonitor implements Runnable {
!sch.getState().equals(State.UNINSTALLED) &&
!sch.getState().equals(State.DISABLED)) {
LOG.warn("Setting component state to UNKNOWN for component " + sc.getName() + " on " + host);
+ State oldState = sch.getState();
sch.setState(State.UNKNOWN);
+ sch.setLastValidState(oldState);
}
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
index ef9b0f2..466b24c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
@@ -56,15 +56,12 @@ import org.apache.ambari.server.orm.dao.KerberosPrincipalHostDAO;
import org.apache.ambari.server.state.Alert;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
-import org.apache.ambari.server.state.ComponentInfo;
import org.apache.ambari.server.state.Host;
import org.apache.ambari.server.state.HostHealthStatus;
import org.apache.ambari.server.state.HostState;
-import org.apache.ambari.server.state.MaintenanceState;
import org.apache.ambari.server.state.Service;
import org.apache.ambari.server.state.ServiceComponent;
import org.apache.ambari.server.state.ServiceComponentHost;
-import org.apache.ambari.server.state.StackId;
import org.apache.ambari.server.state.UpgradeState;
import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
import org.apache.ambari.server.state.host.HostStatusUpdatesReceivedEvent;
@@ -297,51 +294,7 @@ public class HeartbeatProcessor extends AbstractService{
}
if (calculateHostStatus) {
- //Use actual component status to compute the host status
- int masterCount = 0;
- int mastersRunning = 0;
- int slaveCount = 0;
- int slavesRunning = 0;
-
- Cluster cluster = clusterFsm.getCluster(clusterId);
-
- List<ServiceComponentHost> scHosts = cluster.getServiceComponentHosts(hostName);
- for (ServiceComponentHost scHost : scHosts) {
- StackId stackId = scHost.getDesiredStackId();
-
- ComponentInfo componentInfo =
- ambariMetaInfo.getComponent(stackId.getStackName(),
- stackId.getStackVersion(), scHost.getServiceName(),
- scHost.getServiceComponentName());
-
- String status = scHost.getState().name();
-
- String category = componentInfo.getCategory();
-
- if (MaintenanceState.OFF == maintenanceStateHelper.getEffectiveState(scHost, host)) {
- if (category.equals("MASTER")) {
- ++masterCount;
- if (status.equals("STARTED")) {
- ++mastersRunning;
- }
- } else if (category.equals("SLAVE")) {
- ++slaveCount;
- if (status.equals("STARTED")) {
- ++slavesRunning;
- }
- }
- }
- }
-
- if (masterCount == mastersRunning && slaveCount == slavesRunning) {
- healthStatus = HostHealthStatus.HealthStatus.HEALTHY;
- } else if (masterCount > 0 && mastersRunning < masterCount) {
- healthStatus = HostHealthStatus.HealthStatus.UNHEALTHY;
- } else {
- healthStatus = HostHealthStatus.HealthStatus.ALERT;
- }
-
- host.setStatus(healthStatus.name());
+ host.calculateHostStatus(clusterId);
}
//If host doesn't belong to any cluster
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java
index 11f299c..f966386 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,6 +19,8 @@
package org.apache.ambari.server.agent.stomp;
import java.util.Objects;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import javax.inject.Inject;
@@ -38,9 +40,17 @@ public abstract class AgentClusterDataHolder<T extends AmbariUpdateEvent & Hasha
private T data;
+ //TODO perhaps need optimization
+ private Lock lock = new ReentrantLock();
+
public T getUpdateIfChanged(String agentHash) throws AmbariException {
- initializeDataIfNeeded(true);
- return !Objects.equals(agentHash, data.getHash()) ? data : getEmptyData();
+ try {
+ lock.lock();
+ initializeDataIfNeeded(true);
+ return !Objects.equals(agentHash, data.getHash()) ? data : getEmptyData();
+ } finally {
+ lock.unlock();
+ }
}
/**
@@ -71,7 +81,12 @@ public abstract class AgentClusterDataHolder<T extends AmbariUpdateEvent & Hasha
}
protected final void regenerateHash() {
- regenerateHash(data);
+ try {
+ lock.lock();
+ regenerateHash(data);
+ } finally {
+ lock.unlock();
+ }
}
protected final void initializeDataIfNeeded(boolean regenerateHash) throws AmbariException {
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java
index 54d8c23..50779ff 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java
@@ -40,28 +40,28 @@ public class AgentConfigsHolder extends AgentHostDataHolder<AgentConfigsUpdateEv
private Provider<Clusters> clusters;
@Override
- public AgentConfigsUpdateEvent getCurrentData(String hostName) throws AmbariException {
- return configHelper.getHostActualConfigs(hostName);
+ public AgentConfigsUpdateEvent getCurrentData(Long hostId) throws AmbariException {
+ return configHelper.getHostActualConfigs(hostId);
}
protected boolean handleUpdate(AgentConfigsUpdateEvent update) throws AmbariException {
- setData(update, update.getHostName());
+ setData(update, update.getHostId());
return true;
}
- public void updateData(Long clusterId, List<String> hostNames) throws AmbariException {
- if (CollectionUtils.isEmpty(hostNames)) {
+ public void updateData(Long clusterId, List<Long> hostIds) throws AmbariException {
+ if (CollectionUtils.isEmpty(hostIds)) {
// TODO cluster configs will be created before hosts assigning
if (CollectionUtils.isEmpty(clusters.get().getCluster(clusterId).getHosts())) {
- hostNames = clusters.get().getHosts().stream().map(Host::getHostName).collect(Collectors.toList());
+ hostIds = clusters.get().getHosts().stream().map(Host::getHostId).collect(Collectors.toList());
} else {
- hostNames = clusters.get().getCluster(clusterId).getHosts().stream().map(Host::getHostName).collect(Collectors.toList());
+ hostIds = clusters.get().getCluster(clusterId).getHosts().stream().map(Host::getHostId).collect(Collectors.toList());
}
}
- for (String hostName : hostNames) {
- AgentConfigsUpdateEvent agentConfigsUpdateEvent = configHelper.getHostActualConfigs(hostName);
- agentConfigsUpdateEvent.setHostName(hostName);
+ for (Long hostId : hostIds) {
+ AgentConfigsUpdateEvent agentConfigsUpdateEvent = configHelper.getHostActualConfigs(hostId);
+ agentConfigsUpdateEvent.setHostId(hostId);
updateData(agentConfigsUpdateEvent);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentCurrentDataController.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentCurrentDataController.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentCurrentDataController.java
index 5ea5f06..0a46ce1 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentCurrentDataController.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentCurrentDataController.java
@@ -29,7 +29,6 @@ import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.annotation.SendToUser;
-import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.stereotype.Controller;
import com.google.inject.Injector;
@@ -55,30 +54,30 @@ public class AgentCurrentDataController {
alertDefinitionsHolder = injector.getInstance(AlertDefinitionsHolder.class);
}
- @SubscribeMapping("/topologies")
+ @MessageMapping("/topologies")
public TopologyUpdateEvent getCurrentTopology(Hash hash) throws AmbariException, InvalidStateTransitionException {
return topologyHolder.getUpdateIfChanged(hash.getHash());
}
- @SubscribeMapping("/metadata")
+ @MessageMapping("/metadata")
public MetadataUpdateEvent getCurrentMetadata(Hash hash) throws AmbariException {
return metadataHolder.getUpdateIfChanged(hash.getHash());
}
- @SubscribeMapping("/alert_definitions")
+ @MessageMapping("/alert_definitions")
public AlertDefinitionsUpdateEvent getAlertDefinitions(@Header String simpSessionId, Hash hash) throws AmbariException {
- String hostName = agentSessionManager.getHost(simpSessionId).getHostName();
- return alertDefinitionsHolder.getUpdateIfChanged(hash.getHash(), hostName);
+ Long hostId = agentSessionManager.getHost(simpSessionId).getHostId();
+ return alertDefinitionsHolder.getUpdateIfChanged(hash.getHash(), hostId);
}
- @SubscribeMapping("/configs")
+ @MessageMapping("/configs")
public AgentConfigsUpdateEvent getCurrentConfigs(@Header String simpSessionId, Hash hash) throws AmbariException {
- return agentConfigsHolder.getUpdateIfChanged(hash.getHash(), agentSessionManager.getHost(simpSessionId).getHostName());
+ return agentConfigsHolder.getUpdateIfChanged(hash.getHash(), agentSessionManager.getHost(simpSessionId).getHostId());
}
- @SubscribeMapping("/host_level_params")
+ @MessageMapping("/host_level_params")
public HostLevelParamsUpdateEvent getCurrentHostLevelParams(@Header String simpSessionId, Hash hash) throws AmbariException {
- return hostLevelParamsHolder.getUpdateIfChanged(hash.getHash(), agentSessionManager.getHost(simpSessionId).getHostName());
+ return hostLevelParamsHolder.getUpdateIfChanged(hash.getHash(), agentSessionManager.getHost(simpSessionId).getHostId());
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java
index 746b755..7c540f9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentHostDataHolder.java
@@ -38,24 +38,24 @@ public abstract class AgentHostDataHolder<T extends AmbariHostUpdateEvent & Hash
@Inject
private StateUpdateEventPublisher stateUpdateEventPublisher;
- private final Map<String, T> data = new ConcurrentHashMap<>();
+ private final Map<Long, T> data = new ConcurrentHashMap<>();
- protected abstract T getCurrentData(String hostName) throws AmbariException;
+ protected abstract T getCurrentData(Long hostId) throws AmbariException;
protected abstract boolean handleUpdate(T update) throws AmbariException;
- public T getUpdateIfChanged(String agentHash, String hostName) throws AmbariException {
- T hostData = initializeDataIfNeeded(hostName, true);
+ public T getUpdateIfChanged(String agentHash, Long hostId) throws AmbariException {
+ T hostData = initializeDataIfNeeded(hostId, true);
return !Objects.equals(agentHash, hostData.getHash()) ? hostData : getEmptyData();
}
- private T initializeDataIfNeeded(String hostName, boolean regenerateHash) throws AmbariException {
- T hostData = data.get(hostName);
+ private T initializeDataIfNeeded(Long hostId, boolean regenerateHash) throws AmbariException {
+ T hostData = data.get(hostId);
if (hostData == null) {
- hostData = getCurrentData(hostName);
+ hostData = getCurrentData(hostId);
if (regenerateHash) {
regenerateHash(hostData);
}
- data.put(hostName, hostData);
+ data.put(hostId, hostData);
}
return hostData;
}
@@ -65,9 +65,9 @@ public abstract class AgentHostDataHolder<T extends AmbariHostUpdateEvent & Hash
* event to listeners.
*/
public final void updateData(T update) throws AmbariException {
- initializeDataIfNeeded(update.getHostName(), false);
+ initializeDataIfNeeded(update.getHostId(), false);
if (handleUpdate(update)) {
- T hostData = getData(update.getHostName());
+ T hostData = getData(update.getHostId());
regenerateHash(hostData);
update.setHash(hostData.getHash());
stateUpdateEventPublisher.publish(update);
@@ -77,28 +77,28 @@ public abstract class AgentHostDataHolder<T extends AmbariHostUpdateEvent & Hash
/**
* Reset data for the given host. Used if changes are complex and it's easier to re-create data from scratch.
*/
- public final void resetData(String hostName) throws AmbariException {
- T newData = getCurrentData(hostName);
- data.replace(hostName, newData);
+ public final void resetData(Long hostId) throws AmbariException {
+ T newData = getCurrentData(hostId);
+ data.replace(hostId, newData);
stateUpdateEventPublisher.publish(newData);
}
/**
* Remove data for the given host.
*/
- public final void onHostRemoved(String hostName) {
- data.remove(hostName);
+ public final void onHostRemoved(String hostId) {
+ data.remove(hostId);
}
- public Map<String, T> getData() {
+ public Map<Long, T> getData() {
return data;
}
- public T getData(String hostName) {
- return data.get(hostName);
+ public T getData(Long hostId) {
+ return data.get(hostId);
}
- public void setData(T data, String hostName) {
- this.data.put(hostName, data);
+ public void setData(T data, Long hostId) {
+ this.data.put(hostId, data);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
index 5599254..ccfbc75 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
@@ -25,6 +25,8 @@ import java.util.Map;
import javax.ws.rs.WebApplicationException;
import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.agent.AgentReport;
+import org.apache.ambari.server.agent.AgentReportsProcessor;
import org.apache.ambari.server.agent.AgentSessionManager;
import org.apache.ambari.server.agent.CommandReport;
import org.apache.ambari.server.agent.ComponentStatus;
@@ -40,7 +42,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.annotation.SendToUser;
-import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.stereotype.Controller;
import com.google.inject.Injector;
@@ -52,13 +53,15 @@ public class AgentReportsController {
private static final Logger LOG = LoggerFactory.getLogger(AgentReportsController.class);
private final HeartBeatHandler hh;
private final AgentSessionManager agentSessionManager;
+ private final AgentReportsProcessor agentReportsProcessor;
public AgentReportsController(Injector injector) {
hh = injector.getInstance(HeartBeatHandler.class);
agentSessionManager = injector.getInstance(AgentSessionManager.class);
+ agentReportsProcessor = injector.getInstance(AgentReportsProcessor.class);
}
- @SubscribeMapping("/component_status")
+ @MessageMapping("/component_status")
public void handleComponentReportStatus(@Header String simpSessionId, ComponentStatusReports message)
throws WebApplicationException, InvalidStateTransitionException, AmbariException {
List<ComponentStatus> statuses = new ArrayList<>();
@@ -73,11 +76,11 @@ public class AgentReportsController {
}
}
- hh.handleComponentReportStatus(statuses,
- agentSessionManager.getHost(simpSessionId).getHostName());
+ agentReportsProcessor.addAgentReport(new AgentReport(agentSessionManager.getHost(simpSessionId).getHostName(),
+ statuses, null, null));
}
- @SubscribeMapping("/commands_status")
+ @MessageMapping("/commands_status")
public void handleCommandReportStatus(@Header String simpSessionId, CommandStatusReports message)
throws WebApplicationException, InvalidStateTransitionException, AmbariException {
List<CommandReport> statuses = new ArrayList<>();
@@ -85,16 +88,17 @@ public class AgentReportsController {
statuses.addAll(clusterReport.getValue());
}
- hh.handleCommandReportStatus(statuses,
- agentSessionManager.getHost(simpSessionId).getHostName());
+ agentReportsProcessor.addAgentReport(new AgentReport(agentSessionManager.getHost(simpSessionId).getHostName(),
+ null, statuses, null));
}
- @SubscribeMapping("/host_status")
+ @MessageMapping("/host_status")
public void handleHostReportStatus(@Header String simpSessionId, HostStatusReport message) throws AmbariException {
- hh.handleHostReportStatus(message, agentSessionManager.getHost(simpSessionId).getHostName());
+ agentReportsProcessor.addAgentReport(new AgentReport(agentSessionManager.getHost(simpSessionId).getHostName(),
+ null, null, message));
}
- @SubscribeMapping("/alerts_status")
+ @MessageMapping("/alerts_status")
public void handleAlertsStatus(@Header String simpSessionId, Alert[] message) throws AmbariException {
String hostName = agentSessionManager.getHost(simpSessionId).getHostName();
List<Alert> alerts = Arrays.asList(message);
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentsRegistrationQueue.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentsRegistrationQueue.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentsRegistrationQueue.java
new file mode 100644
index 0000000..17518ad
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentsRegistrationQueue.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent.stomp;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.ambari.server.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Injector;
+
+/**
+ * Simultaneous processing a lot of registering/topology/metadata etc. requests from agents during
+ * agent registration can cause response timeout on agents' side. So it is allowed to process simultaneously requests
+ * only from limited number of agents with session ids from {@link registrationQueue}. Queue has limited capacity,
+ * session id can able be appeared in queue with agent connecting to server and releases with first heartbeat or disconnect from
+ * server.
+ */
+public class AgentsRegistrationQueue {
+ private static final Logger LOG = LoggerFactory.getLogger(AgentsRegistrationQueue.class);
+ private final BlockingQueue<String> registrationQueue;
+ private final ThreadFactory threadFactoryExecutor = new ThreadFactoryBuilder().setNameFormat("agents-queue-%d").build();
+ private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1, threadFactoryExecutor);
+
+ public AgentsRegistrationQueue(Injector injector) {
+ Configuration configuration = injector.getInstance(Configuration.class);
+ registrationQueue = new ArrayBlockingQueue<>(configuration.getAgentsRegistrationQueueSize());
+ }
+
+ public boolean offer(String sessionId) {
+ boolean offered = registrationQueue.offer(sessionId);
+ scheduledExecutorService.schedule(new CompleteJob(sessionId, registrationQueue), 60, TimeUnit.SECONDS);
+ return offered;
+ }
+
+ public void complete(String sessionId) {
+ registrationQueue.remove(sessionId);
+ }
+
+ private class CompleteJob implements Runnable {
+ private String sessionId;
+ private BlockingQueue<String> registrationQueue;
+
+ public CompleteJob(String sessionId, BlockingQueue<String> registrationQueue) {
+ this.sessionId = sessionId;
+ this.registrationQueue = registrationQueue;
+ }
+
+ @Override
+ public void run() {
+ registrationQueue.remove(sessionId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AlertDefinitionsHolder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AlertDefinitionsHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AlertDefinitionsHolder.java
index 6c6bdd4..9c3f9b5 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AlertDefinitionsHolder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AlertDefinitionsHolder.java
@@ -67,9 +67,10 @@ public class AlertDefinitionsHolder extends AgentHostDataHolder<AlertDefinitions
}
@Override
- protected AlertDefinitionsUpdateEvent getCurrentData(String hostName) throws AmbariException {
+ protected AlertDefinitionsUpdateEvent getCurrentData(Long hostId) throws AmbariException {
Map<Long, AlertCluster> result = new TreeMap<>();
- Map<Long, Map<Long, AlertDefinition>> alertDefinitions = helper.get().getAlertDefinitions(hostName);
+ Map<Long, Map<Long, AlertDefinition>> alertDefinitions = helper.get().getAlertDefinitions(hostId);
+ String hostName = clusters.get().getHostById(hostId).getHostName();
long count = 0;
for (Map.Entry<Long, Map<Long, AlertDefinition>> e : alertDefinitions.entrySet()) {
Long clusterId = e.getKey();
@@ -78,7 +79,7 @@ public class AlertDefinitionsHolder extends AgentHostDataHolder<AlertDefinitions
count += definitionMap.size();
}
LOG.info("Loaded {} alert definitions for {} clusters for host {}", count, result.size(), hostName);
- return new AlertDefinitionsUpdateEvent(CREATE, result, hostName);
+ return new AlertDefinitionsUpdateEvent(CREATE, result, hostName, hostId);
}
@Override
@@ -93,9 +94,9 @@ public class AlertDefinitionsHolder extends AgentHostDataHolder<AlertDefinitions
return false;
}
- String hostName = update.getHostName();
+ Long hostId = update.getHostId();
boolean changed = false;
- Map<Long, AlertCluster> existingClusters = getData(hostName).getClusters();
+ Map<Long, AlertCluster> existingClusters = getData(hostId).getClusters();
switch (update.getEventType()) {
case UPDATE:
@@ -106,7 +107,7 @@ public class AlertDefinitionsHolder extends AgentHostDataHolder<AlertDefinitions
for (Map.Entry<Long, AlertCluster> e : updateClusters.entrySet()) {
changed |= existingClusters.get(e.getKey()).handleUpdate(update.getEventType(), e.getValue());
}
- LOG.debug("Handled {} of alerts for {} cluster(s) on host {}, changed = {}", update.getEventType(), updateClusters.size(), hostName, changed);
+ LOG.debug("Handled {} of alerts for {} cluster(s) on host with id {}, changed = {}", update.getEventType(), updateClusters.size(), hostId, changed);
break;
case CREATE:
if (!updateClusters.isEmpty()) {
@@ -127,25 +128,26 @@ public class AlertDefinitionsHolder extends AgentHostDataHolder<AlertDefinitions
}
@Subscribe
- public void onAlertDefinitionRegistered(AlertDefinitionRegistrationEvent event) {
+ public void onAlertDefinitionRegistered(AlertDefinitionRegistrationEvent event) throws AmbariException {
handleSingleDefinitionChange(UPDATE, event.getDefinition());
}
@Subscribe
- public void onAlertDefinitionChanged(AlertDefinitionChangedEvent event) {
+ public void onAlertDefinitionChanged(AlertDefinitionChangedEvent event) throws AmbariException {
handleSingleDefinitionChange(UPDATE, event.getDefinition());
}
@Subscribe
- public void onAlertDefinitionDeleted(AlertDefinitionDeleteEvent event) {
+ public void onAlertDefinitionDeleted(AlertDefinitionDeleteEvent event) throws AmbariException {
handleSingleDefinitionChange(DELETE, event.getDefinition());
}
@Subscribe
- public void onServiceComponentInstalled(ServiceComponentInstalledEvent event) {
+ public void onServiceComponentInstalled(ServiceComponentInstalledEvent event) throws AmbariException {
String hostName = event.getHostName();
String serviceName = event.getServiceName();
String componentName = event.getComponentName();
+ Long hostId = clusters.get().getHost(hostName).getHostId();
Map<Long, AlertDefinition> definitions = helper.get().findByServiceComponent(event.getClusterId(), serviceName, componentName);
@@ -162,18 +164,19 @@ public class AlertDefinitionsHolder extends AgentHostDataHolder<AlertDefinitions
}
Map<Long, AlertCluster> map = Collections.singletonMap(event.getClusterId(), new AlertCluster(definitions, hostName));
- safelyUpdateData(new AlertDefinitionsUpdateEvent(UPDATE, map, hostName));
+ safelyUpdateData(new AlertDefinitionsUpdateEvent(UPDATE, map, hostName, hostId));
}
@Subscribe
- public void onServiceComponentUninstalled(ServiceComponentUninstalledEvent event) {
+ public void onServiceComponentUninstalled(ServiceComponentUninstalledEvent event) throws AmbariException {
String hostName = event.getHostName();
+ Long hostId = clusters.get().getHost(hostName).getHostId();
Map<Long, AlertDefinition> definitions = helper.get().findByServiceComponent(event.getClusterId(), event.getServiceName(), event.getComponentName());
if (event.isMasterComponent()) {
definitions.putAll(helper.get().findByServiceMaster(event.getClusterId(), event.getServiceName()));
}
Map<Long, AlertCluster> map = Collections.singletonMap(event.getClusterId(), new AlertCluster(definitions, hostName));
- safelyUpdateData(new AlertDefinitionsUpdateEvent(DELETE, map, hostName));
+ safelyUpdateData(new AlertDefinitionsUpdateEvent(DELETE, map, hostName, hostId));
}
@Subscribe
@@ -191,20 +194,21 @@ public class AlertDefinitionsHolder extends AgentHostDataHolder<AlertDefinitions
}
}
- private void safelyResetData(String hostName) {
+ private void safelyResetData(Long hostId) {
try {
- resetData(hostName);
+ resetData(hostId);
} catch (AmbariException e) {
- LOG.warn(String.format("Failed to reset alert definitions for host %s", hostName), e);
+ LOG.warn(String.format("Failed to reset alert definitions for host with id %s", hostId), e);
}
}
- private void handleSingleDefinitionChange(AlertDefinitionsUpdateEvent.EventType eventType, AlertDefinition alertDefinition) {
+ private void handleSingleDefinitionChange(AlertDefinitionsUpdateEvent.EventType eventType, AlertDefinition alertDefinition) throws AmbariException {
LOG.info("{} alert definition '{}'", eventType, alertDefinition);
Set<String> hosts = helper.get().invalidateHosts(alertDefinition);
for (String hostName : hosts) {
+ Long hostId = clusters.get().getHost(hostName).getHostId();
Map<Long, AlertCluster> update = Collections.singletonMap(alertDefinition.getClusterId(), new AlertCluster(alertDefinition, hostName));
- AlertDefinitionsUpdateEvent event = new AlertDefinitionsUpdateEvent(eventType, update, hostName);
+ AlertDefinitionsUpdateEvent event = new AlertDefinitionsUpdateEvent(eventType, update, hostName, hostId);
safelyUpdateData(event);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
new file mode 100644
index 0000000..aaab7bf
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
@@ -0,0 +1,536 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.agent.stomp;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.expression.AccessException;
+import org.springframework.expression.EvaluationContext;
+import org.springframework.expression.Expression;
+import org.springframework.expression.ExpressionParser;
+import org.springframework.expression.PropertyAccessor;
+import org.springframework.expression.TypedValue;
+import org.springframework.expression.spel.SpelEvaluationException;
+import org.springframework.expression.spel.standard.SpelExpressionParser;
+import org.springframework.expression.spel.support.StandardEvaluationContext;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
+import org.springframework.messaging.simp.broker.AbstractSubscriptionRegistry;
+import org.springframework.messaging.simp.broker.SubscriptionRegistry;
+import org.springframework.messaging.support.MessageHeaderAccessor;
+import org.springframework.util.AntPathMatcher;
+import org.springframework.util.Assert;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+import org.springframework.util.PathMatcher;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+/**
+ * Implementation of {@link SubscriptionRegistry} that has configurable cache size, optimized working with cache and
+ * destinations matching.
+ */
+public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
+ private static final Logger LOG = LoggerFactory.getLogger(AmbariSubscriptionRegistry.class);
+
+ private PathMatcher pathMatcher = new AntPathMatcher();
+
+ private volatile int cacheLimit;
+
+ private String selectorHeaderName = "selector";
+
+ private volatile boolean selectorHeaderInUse = false;
+
+ private final ExpressionParser expressionParser = new SpelExpressionParser();
+
+ private final DestinationCache destinationCache;
+
+ private final SessionSubscriptionRegistry subscriptionRegistry = new SessionSubscriptionRegistry();
+
+ public AmbariSubscriptionRegistry(int cacheLimit) {
+ this.cacheLimit = cacheLimit;
+ destinationCache = new DestinationCache();
+ }
+
+ /**
+ * Specify the {@link PathMatcher} to use.
+ */
+ public void setPathMatcher(PathMatcher pathMatcher) {
+ this.pathMatcher = pathMatcher;
+ }
+
+ /**
+ * Return the configured {@link PathMatcher}.
+ */
+ public PathMatcher getPathMatcher() {
+ return this.pathMatcher;
+ }
+
+ /**
+ * Specify the maximum number of entries for the resolved destination cache.
+ * Default is 1024.
+ */
+ public void setCacheLimit(int cacheLimit) {
+ this.cacheLimit = cacheLimit;
+ }
+
+ /**
+ * Return the maximum number of entries for the resolved destination cache.
+ */
+ public int getCacheLimit() {
+ return this.cacheLimit;
+ }
+
+ /**
+ * Configure the name of a selector header that a subscription message can
+ * have in order to filter messages based on their headers. The value of the
+ * header can use Spring EL expressions against message headers.
+ * <p>For example the following expression expects a header called "foo" to
+ * have the value "bar":
+ * <pre>
+ * headers.foo == 'bar'
+ * </pre>
+ * <p>By default this is set to "selector".
+ * @since 4.2
+ */
+ public void setSelectorHeaderName(String selectorHeaderName) {
+ Assert.notNull(selectorHeaderName, "'selectorHeaderName' must not be null");
+ this.selectorHeaderName = selectorHeaderName;
+ }
+
+ /**
+ * Return the name for the selector header.
+ * @since 4.2
+ */
+ public String getSelectorHeaderName() {
+ return this.selectorHeaderName;
+ }
+
+
+ @Override
+ protected void addSubscriptionInternal(
+ String sessionId, String subsId, String destination, Message<?> message) {
+
+ Expression expression = null;
+ MessageHeaders headers = message.getHeaders();
+ String selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers);
+ if (selector != null) {
+ try {
+ expression = this.expressionParser.parseExpression(selector);
+ this.selectorHeaderInUse = true;
+ if (logger.isTraceEnabled()) {
+ logger.trace("Subscription selector: [" + selector + "]");
+ }
+ }
+ catch (Throwable ex) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Failed to parse selector: " + selector, ex);
+ }
+ }
+ }
+ this.subscriptionRegistry.addSubscription(sessionId, subsId, destination, expression);
+ this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId);
+ }
+
+ @Override
+ protected void removeSubscriptionInternal(String sessionId, String subsId, Message<?> message) {
+ SessionSubscriptionInfo info = this.subscriptionRegistry.getSubscriptions(sessionId);
+ if (info != null) {
+ String destination = info.removeSubscription(subsId);
+ if (destination != null) {
+ this.destinationCache.updateAfterRemovedSubscription(sessionId, subsId);
+ }
+ }
+ }
+
+ @Override
+ public void unregisterAllSubscriptions(String sessionId) {
+ SessionSubscriptionInfo info = this.subscriptionRegistry.removeSubscriptions(sessionId);
+ if (info != null) {
+ this.destinationCache.updateAfterRemovedSession(info);
+ }
+ }
+
+ @Override
+ protected MultiValueMap<String, String> findSubscriptionsInternal(String destination, Message<?> message) {
+ MultiValueMap<String, String> result = this.destinationCache.getSubscriptions(destination, message);
+ return filterSubscriptions(result, message);
+ }
+
+ private MultiValueMap<String, String> filterSubscriptions(
+ MultiValueMap<String, String> allMatches, Message<?> message) {
+
+ if (!this.selectorHeaderInUse) {
+ return allMatches;
+ }
+ EvaluationContext context = null;
+ MultiValueMap<String, String> result = new LinkedMultiValueMap<String, String>(allMatches.size());
+ for (String sessionId : allMatches.keySet()) {
+ for (String subId : allMatches.get(sessionId)) {
+ SessionSubscriptionInfo info = this.subscriptionRegistry.getSubscriptions(sessionId);
+ if (info == null) {
+ continue;
+ }
+ Subscription sub = info.getSubscription(subId);
+ if (sub == null) {
+ continue;
+ }
+ Expression expression = sub.getSelectorExpression();
+ if (expression == null) {
+ result.add(sessionId, subId);
+ continue;
+ }
+ if (context == null) {
+ context = new StandardEvaluationContext(message);
+ context.getPropertyAccessors().add(new SimpMessageHeaderPropertyAccessor());
+ }
+ try {
+ if (expression.getValue(context, boolean.class)) {
+ result.add(sessionId, subId);
+ }
+ }
+ catch (SpelEvaluationException ex) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Failed to evaluate selector: " + ex.getMessage());
+ }
+ }
+ catch (Throwable ex) {
+ logger.debug("Failed to evaluate selector", ex);
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "DefaultSubscriptionRegistry[" + this.destinationCache + ", " + this.subscriptionRegistry + "]";
+ }
+
+
+ /**
+ * A cache for destinations previously resolved via
+ * {@link org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry#findSubscriptionsInternal(String, Message)}
+ */
+ private class DestinationCache {
+
+ /** Map from destination -> <sessionId, subscriptionId> for fast look-ups */
+ private final Map<String, LinkedMultiValueMap<String, String>> accessCache =
+ new ConcurrentHashMap<>(cacheLimit);
+
+ private final Cache<String, String> notSubscriptionCache =
+ CacheBuilder.newBuilder().maximumSize(cacheLimit).build();
+
+ public LinkedMultiValueMap<String, String> getSubscriptions(String destination, Message<?> message) {
+ if (notSubscriptionCache.asMap().keySet().contains(destination)) {
+ return new LinkedMultiValueMap<>();
+ }
+ LinkedMultiValueMap<String, String> subscriptions = this.accessCache.computeIfAbsent(destination, (key) -> {
+ LinkedMultiValueMap<String, String> result = new LinkedMultiValueMap<>();
+ for (SessionSubscriptionInfo info : subscriptionRegistry.getAllSubscriptions()) {
+ for (String destinationPattern : info.getDestinations()) {
+ //TODO temporary changed to more fast-acting check without regex, need move investigation
+ if (destinationPattern.equals(destination)) {
+ for (Subscription subscription : info.getSubscriptions(destinationPattern)) {
+ result.add(info.sessionId, subscription.getId());
+ }
+ }
+ }
+ }
+ if (!result.isEmpty()) {
+ return result;
+ } else {
+ notSubscriptionCache.put(destination, "");
+ return null;
+ }
+ });
+ return subscriptions == null ? new LinkedMultiValueMap<>() : subscriptions;
+ }
+
+ public void updateAfterNewSubscription(String destination, String sessionId, String subsId) {
+ this.accessCache.computeIfPresent(destination, (key, value) -> {
+ if (getPathMatcher().match(destination, key)) {
+ LinkedMultiValueMap<String, String> subs = value.deepCopy();
+ subs.add(sessionId, subsId);
+ return subs;
+ }
+ return value;
+ });
+ }
+
+ public void updateAfterRemovedSubscription(String sessionId, String subsId) {
+ for (Iterator<Map.Entry<String, LinkedMultiValueMap<String, String>>> iterator =
+ this.accessCache.entrySet().iterator(); iterator.hasNext(); ) {
+ Map.Entry<String, LinkedMultiValueMap<String, String>> entry = iterator.next();
+ String destination = entry.getKey();
+ LinkedMultiValueMap<String, String> sessionMap = entry.getValue();
+ List<String> subscriptions = sessionMap.get(sessionId);
+ if (subscriptions != null) {
+ subscriptions.remove(subsId);
+ if (subscriptions.isEmpty()) {
+ sessionMap.remove(sessionId);
+ }
+ if (sessionMap.isEmpty()) {
+ iterator.remove();
+ }
+ else {
+ this.accessCache.put(destination, sessionMap.deepCopy());
+ }
+ }
+ }
+ }
+
+ public void updateAfterRemovedSession(SessionSubscriptionInfo info) {
+ for (Iterator<Map.Entry<String, LinkedMultiValueMap<String, String>>> iterator =
+ this.accessCache.entrySet().iterator(); iterator.hasNext(); ) {
+ Map.Entry<String, LinkedMultiValueMap<String, String>> entry = iterator.next();
+ String destination = entry.getKey();
+ LinkedMultiValueMap<String, String> sessionMap = entry.getValue();
+ if (sessionMap.remove(info.getSessionId()) != null) {
+ if (sessionMap.isEmpty()) {
+ iterator.remove();
+ }
+ else {
+ this.accessCache.put(destination, sessionMap.deepCopy());
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "cache[" + this.accessCache.size() + " destination(s)]";
+ }
+ }
+
+
+ /**
+ * Provide access to session subscriptions by sessionId.
+ */
+ private static class SessionSubscriptionRegistry {
+
+ // sessionId -> SessionSubscriptionInfo
+ private final ConcurrentMap<String, SessionSubscriptionInfo> sessions =
+ new ConcurrentHashMap<String, SessionSubscriptionInfo>();
+
+ public SessionSubscriptionInfo getSubscriptions(String sessionId) {
+ return this.sessions.get(sessionId);
+ }
+
+ public Collection<SessionSubscriptionInfo> getAllSubscriptions() {
+ return this.sessions.values();
+ }
+
+ public SessionSubscriptionInfo addSubscription(String sessionId, String subscriptionId,
+ String destination, Expression selectorExpression) {
+
+ SessionSubscriptionInfo info = this.sessions.get(sessionId);
+ if (info == null) {
+ info = new SessionSubscriptionInfo(sessionId);
+ SessionSubscriptionInfo value = this.sessions.putIfAbsent(sessionId, info);
+ if (value != null) {
+ info = value;
+ }
+ }
+ info.addSubscription(destination, subscriptionId, selectorExpression);
+ return info;
+ }
+
+ public SessionSubscriptionInfo removeSubscriptions(String sessionId) {
+ return this.sessions.remove(sessionId);
+ }
+
+ @Override
+ public String toString() {
+ return "registry[" + this.sessions.size() + " sessions]";
+ }
+ }
+
+
+ /**
+ * Hold subscriptions for a session.
+ */
+ private static class SessionSubscriptionInfo {
+
+ private final String sessionId;
+
+ // destination -> subscriptions
+ private final Map<String, Set<Subscription>> destinationLookup =
+ new ConcurrentHashMap<String, Set<Subscription>>(4);
+
+ public SessionSubscriptionInfo(String sessionId) {
+ Assert.notNull(sessionId, "'sessionId' must not be null");
+ this.sessionId = sessionId;
+ }
+
+ public String getSessionId() {
+ return this.sessionId;
+ }
+
+ public Set<String> getDestinations() {
+ return this.destinationLookup.keySet();
+ }
+
+ public Set<Subscription> getSubscriptions(String destination) {
+ return this.destinationLookup.get(destination);
+ }
+
+ public Subscription getSubscription(String subscriptionId) {
+ for (String destination : this.destinationLookup.keySet()) {
+ Set<Subscription> subs = this.destinationLookup.get(destination);
+ if (subs != null) {
+ for (Subscription sub : subs) {
+ if (sub.getId().equalsIgnoreCase(subscriptionId)) {
+ return sub;
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public void addSubscription(String destination, String subscriptionId, Expression selectorExpression) {
+ Set<Subscription> subs = this.destinationLookup.get(destination);
+ if (subs == null) {
+ synchronized (this.destinationLookup) {
+ subs = this.destinationLookup.get(destination);
+ if (subs == null) {
+ subs = new CopyOnWriteArraySet<Subscription>();
+ this.destinationLookup.put(destination, subs);
+ }
+ }
+ }
+ subs.add(new Subscription(subscriptionId, selectorExpression));
+ }
+
+ public String removeSubscription(String subscriptionId) {
+ for (String destination : this.destinationLookup.keySet()) {
+ Set<Subscription> subs = this.destinationLookup.get(destination);
+ if (subs != null) {
+ for (Subscription sub : subs) {
+ if (sub.getId().equals(subscriptionId) && subs.remove(sub)) {
+ synchronized (this.destinationLookup) {
+ if (subs.isEmpty()) {
+ this.destinationLookup.remove(destination);
+ }
+ }
+ return destination;
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "[sessionId=" + this.sessionId + ", subscriptions=" + this.destinationLookup + "]";
+ }
+ }
+
+
+ private static final class Subscription {
+
+ private final String id;
+
+ private final Expression selectorExpression;
+
+ public Subscription(String id, Expression selector) {
+ Assert.notNull(id, "Subscription id must not be null");
+ this.id = id;
+ this.selectorExpression = selector;
+ }
+
+ public String getId() {
+ return this.id;
+ }
+
+ public Expression getSelectorExpression() {
+ return this.selectorExpression;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return (this == other || (other instanceof Subscription && this.id.equals(((Subscription) other).id)));
+ }
+
+ @Override
+ public int hashCode() {
+ return this.id.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "subscription(id=" + this.id + ")";
+ }
+ }
+
+
+ private static class SimpMessageHeaderPropertyAccessor implements PropertyAccessor {
+
+ @Override
+ public Class<?>[] getSpecificTargetClasses() {
+ return new Class<?>[] {MessageHeaders.class};
+ }
+
+ @Override
+ public boolean canRead(EvaluationContext context, Object target, String name) {
+ return true;
+ }
+
+ @Override
+ public TypedValue read(EvaluationContext context, Object target, String name) throws AccessException {
+ MessageHeaders headers = (MessageHeaders) target;
+ SimpMessageHeaderAccessor accessor =
+ MessageHeaderAccessor.getAccessor(headers, SimpMessageHeaderAccessor.class);
+ Object value;
+ if ("destination".equalsIgnoreCase(name)) {
+ value = accessor.getDestination();
+ }
+ else {
+ value = accessor.getFirstNativeHeader(name);
+ if (value == null) {
+ value = headers.get(name);
+ }
+ }
+ return new TypedValue(value);
+ }
+
+ @Override
+ public boolean canWrite(EvaluationContext context, Object target, String name) {
+ return false;
+ }
+
+ @Override
+ public void write(EvaluationContext context, Object target, String name, Object value) {
+ }
+ }
+
+}