You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2017/10/05 10:46:38 UTC
[3/4] ambari git commit: AMBARI-22063. Poor performance of STOMP
subscriptions cache and registration handling. (mpapirkovskyy)
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HeartbeatController.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HeartbeatController.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HeartbeatController.java
index 81fb300..3bf9e79 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HeartbeatController.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HeartbeatController.java
@@ -17,6 +17,16 @@
*/
package org.apache.ambari.server.agent.stomp;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
import javax.ws.rs.WebApplicationException;
import org.apache.ambari.server.AmbariException;
@@ -27,78 +37,138 @@ import org.apache.ambari.server.agent.HeartBeatResponse;
import org.apache.ambari.server.agent.Register;
import org.apache.ambari.server.agent.RegistrationResponse;
import org.apache.ambari.server.agent.RegistrationStatus;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.configuration.spring.GuiceBeansConfig;
import org.apache.ambari.server.state.cluster.ClustersImpl;
import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Import;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.annotation.SendToUser;
-import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.stereotype.Controller;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Injector;
+import com.google.inject.persist.UnitOfWork;
@Controller
@SendToUser("/")
@MessageMapping("/")
+@Import(GuiceBeansConfig.class)
public class HeartbeatController {
- private static Log LOG = LogFactory.getLog(HeartbeatController.class);
+ private static Logger LOG = LoggerFactory.getLogger(HeartbeatController.class);
private final HeartBeatHandler hh;
private final ClustersImpl clusters;
private final AgentSessionManager agentSessionManager;
+ private final LinkedBlockingQueue queue;
+ private final ThreadFactory threadFactoryExecutor = new ThreadFactoryBuilder().setNameFormat("agent-register-processor-%d").build();
+ private final ThreadFactory threadFactoryTimeout = new ThreadFactoryBuilder().setNameFormat("agent-register-timeout-%d").build();
+ private final ExecutorService executor;
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final UnitOfWork unitOfWork;
+
+ @Autowired
+ private AgentsRegistrationQueue agentsRegistrationQueue;
public HeartbeatController(Injector injector) {
hh = injector.getInstance(HeartBeatHandler.class);
clusters = injector.getInstance(ClustersImpl.class);
+ unitOfWork = injector.getInstance(UnitOfWork.class);
agentSessionManager = injector.getInstance(AgentSessionManager.class);
+
+ Configuration configuration = injector.getInstance(Configuration.class);
+ queue = new LinkedBlockingQueue(configuration.getAgentsRegistrationQueueSize());
+ executor = new ThreadPoolExecutor(configuration.getRegistrationThreadPoolSize(),
+ configuration.getRegistrationThreadPoolSize(), 0L, TimeUnit.MILLISECONDS, queue, threadFactoryExecutor);
+ scheduledExecutorService = Executors.newScheduledThreadPool(1, threadFactoryTimeout);
}
- @SubscribeMapping("/register")
- public RegistrationResponse register(@Header String simpSessionId, Register message)
+ @MessageMapping("/register")
+ public CompletableFuture<RegistrationResponse> register(@Header String simpSessionId, Register message)
throws WebApplicationException, InvalidStateTransitionException, AmbariException {
+ CompletableFuture<RegistrationResponse> completableFuture = new CompletableFuture<>();
- /* Call into the heartbeat handler */
+ Future<RegistrationResponse> future = executor.submit(() -> {
+ try {
+ unitOfWork.begin();
+ RegistrationResponse response = null;
+ try {
+ /* Call into the heartbeat handler */
+ response = hh.handleRegistration(message);
+ agentSessionManager.register(simpSessionId,
+ clusters.getHost(message.getHostname()));
+ LOG.debug("Sending registration response " + response);
+ } catch (Exception ex) {
+ LOG.info(ex.getMessage(), ex);
+ response = new RegistrationResponse();
+ response.setResponseId(-1);
+ response.setResponseStatus(RegistrationStatus.FAILED);
+ response.setExitstatus(1);
+ response.setLog(ex.getMessage());
+ completableFuture.complete(response);
+ return response;
+ }
+ completableFuture.complete(response);
+ return response;
+ } finally {
+ unitOfWork.end();
+ }
+ });
- RegistrationResponse response = null;
- try {
- response = hh.handleRegistration(message);
- LOG.debug("Sending registration response " + response);
- } catch (AmbariException ex) {
- response = new RegistrationResponse();
- response.setResponseId(-1);
- response.setResponseStatus(RegistrationStatus.FAILED);
- response.setExitstatus(1);
- response.setLog(ex.getMessage());
- return response;
- }
- agentSessionManager.register(simpSessionId,
- clusters.getHost(message.getHostname()));
- return response;
+ scheduledExecutorService.schedule(new RegistrationTimeoutTask(future, completableFuture), 8, TimeUnit.SECONDS);
+ return completableFuture;
}
- @SubscribeMapping("/heartbeat")
+ @MessageMapping("/heartbeat")
public HeartBeatResponse heartbeat(@Header String simpSessionId, HeartBeat message) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received Heartbeat message " + message);
- }
- HeartBeatResponse heartBeatResponse;
try {
- if (!agentSessionManager.isRegistered(simpSessionId)) {
- //Server restarted, or unknown host.
- LOG.error(String.format("Host with [%s] sessionId not registered", simpSessionId));
- return hh.createRegisterCommand();
- }
- message.setHostname(agentSessionManager.getHost(simpSessionId).getHostName());
- heartBeatResponse = hh.handleHeartBeat(message);
+ unitOfWork.begin();
if (LOG.isDebugEnabled()) {
- LOG.debug("Sending heartbeat response with response id " + heartBeatResponse.getResponseId());
- LOG.debug("Response details " + heartBeatResponse);
+ LOG.debug("Received Heartbeat message " + message);
+ }
+ HeartBeatResponse heartBeatResponse;
+ try {
+ if (!agentSessionManager.isRegistered(simpSessionId)) {
+ //Server restarted, or unknown host.
+ LOG.error(String.format("Host with [%s] sessionId not registered", simpSessionId));
+ return hh.createRegisterCommand();
+ }
+ message.setHostname(agentSessionManager.getHost(simpSessionId).getHostName());
+ heartBeatResponse = hh.handleHeartBeat(message);
+ agentsRegistrationQueue.complete(simpSessionId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending heartbeat response with response id " + heartBeatResponse.getResponseId());
+ LOG.debug("Response details " + heartBeatResponse);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error in HeartBeat", e);
+ throw new WebApplicationException(500);
+ }
+ return heartBeatResponse;
+ } finally {
+ unitOfWork.end();
+ }
+ }
+
+ private class RegistrationTimeoutTask implements Runnable {
+ private Future<RegistrationResponse> task;
+ private CompletableFuture<RegistrationResponse> completableFuture;
+
+ public RegistrationTimeoutTask(Future<RegistrationResponse> task, CompletableFuture<RegistrationResponse> completableFuture) {
+
+ this.task = task;
+ this.completableFuture = completableFuture;
+ }
+
+ @Override
+ public void run() {
+ boolean cancelled = task.cancel(false);
+ if (cancelled) {
+ completableFuture.cancel(false);
}
- } catch (Exception e) {
- LOG.warn("Error in HeartBeat", e);
- throw new WebApplicationException(500);
}
- return heartBeatResponse;
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java
index 4e6d37c..7e02590 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java
@@ -44,14 +44,14 @@ public class HostLevelParamsHolder extends AgentHostDataHolder<HostLevelParamsUp
private Clusters clusters;
@Override
- public HostLevelParamsUpdateEvent getCurrentData(String hostName) throws AmbariException {
+ public HostLevelParamsUpdateEvent getCurrentData(Long hostId) throws AmbariException {
TreeMap<String, HostLevelParamsCluster> hostLevelParamsClusters = new TreeMap<>();
- for (Cluster cl : clusters.getClustersForHost(hostName)) {
- Host host = clusters.getHost(hostName);
+ Host host = clusters.getHostById(hostId);
+ for (Cluster cl : clusters.getClustersForHost(host.getHostName())) {
//TODO fix repo info host param
HostLevelParamsCluster hostLevelParamsCluster = new HostLevelParamsCluster(
null,//ambariMetaInfo.getRepoInfo(cl, host),
- recoveryConfigHelper.getRecoveryConfig(cl.getClusterName(), hostName));
+ recoveryConfigHelper.getRecoveryConfig(cl.getClusterName(), host.getHostName()));
hostLevelParamsClusters.put(Long.toString(cl.getClusterId()),
hostLevelParamsCluster);
@@ -62,7 +62,7 @@ public class HostLevelParamsHolder extends AgentHostDataHolder<HostLevelParamsUp
protected boolean handleUpdate(HostLevelParamsUpdateEvent update) throws AmbariException {
//TODO implement update host level params process
- setData(update, update.getHostName());
+ setData(update, update.getHostId());
return true;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/MetadataHolder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/MetadataHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/MetadataHolder.java
index 104f278..2bf7e0a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/MetadataHolder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/MetadataHolder.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ClusterConfigs.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ClusterConfigs.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ClusterConfigs.java
index 6e01cac..22a28f0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ClusterConfigs.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ClusterConfigs.java
@@ -18,33 +18,33 @@
package org.apache.ambari.server.agent.stomp.dto;
-import java.util.Map;
+import java.util.SortedMap;
import com.fasterxml.jackson.annotation.JsonInclude;
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public class ClusterConfigs {
- private Map<String, Map<String, String>> configurations;
- private Map<String, Map<String, Map<String, String>>> configurationAttributes;
+ private SortedMap<String, SortedMap<String, String>> configurations;
+ private SortedMap<String, SortedMap<String, SortedMap<String, String>>> configurationAttributes;
- public ClusterConfigs(Map<String, Map<String, String>> configurations, Map<String, Map<String, Map<String, String>>> configurationAttributes) {
+ public ClusterConfigs(SortedMap<String, SortedMap<String, String>> configurations, SortedMap<String, SortedMap<String, SortedMap<String, String>>> configurationAttributes) {
this.configurations = configurations;
this.configurationAttributes = configurationAttributes;
}
- public Map<String, Map<String, String>> getConfigurations() {
+ public SortedMap<String, SortedMap<String, String>> getConfigurations() {
return configurations;
}
- public void setConfigurations(Map<String, Map<String, String>> configurations) {
+ public void setConfigurations(SortedMap<String, SortedMap<String, String>> configurations) {
this.configurations = configurations;
}
- public Map<String, Map<String, Map<String, String>>> getConfigurationAttributes() {
+ public SortedMap<String, SortedMap<String, SortedMap<String, String>>> getConfigurationAttributes() {
return configurationAttributes;
}
- public void setConfigurationAttributes(Map<String, Map<String, Map<String, String>>> configurationAttributes) {
+ public void setConfigurationAttributes(SortedMap<String, SortedMap<String, SortedMap<String, String>>> configurationAttributes) {
this.configurationAttributes = configurationAttributes;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/Hash.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/Hash.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/Hash.java
index 8a62d6b..19f9597 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/Hash.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/Hash.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataCluster.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataCluster.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataCluster.java
index 8210779..5620209 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataCluster.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataCluster.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataServiceInfo.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataServiceInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataServiceInfo.java
index a84b509..9b4bc02 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataServiceInfo.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataServiceInfo.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyCluster.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyCluster.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyCluster.java
index c350b5f6..c17ec7f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyCluster.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyCluster.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyComponent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyComponent.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyComponent.java
index 1060232..d96180d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyComponent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyComponent.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyHost.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyHost.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyHost.java
index 8d2b627..9cdf944 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyHost.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyHost.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index 0e519fd..2fd22d5 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -2114,6 +2114,49 @@ public class Configuration {
"messaging.threadpool.size", 1);
/**
+ * The thread pool size for agents registration.
+ */
+ @Markdown(description = "Thread pool size for agents registration")
+ public static final ConfigurationProperty<Integer> REGISTRATION_THREAD_POOL_SIZE = new ConfigurationProperty<>(
+ "registration.threadpool.size", 10);
+
+ /**
+ * Maximal cache size for spring subscription registry.
+ */
+ @Markdown(description = "Maximal cache size for spring subscription registry.")
+ public static final ConfigurationProperty<Integer> SUBSCRIPTION_REGISTRY_CACHE_MAX_SIZE = new ConfigurationProperty<>(
+ "subscription.registry.cache.size", 1500);
+
+ /**
+ * Queue size for agents in registration.
+ */
+ @Markdown(description = "Queue size for agents in registration.")
+ public static final ConfigurationProperty<Integer> AGENTS_REGISTRATION_QUEUE_SIZE = new ConfigurationProperty<>(
+ "agents.registration.queue.size", 200);
+
+
+ /**
+ * Period in seconds with agents reports will be processed.
+ */
+ @Markdown(description = "Period in seconds with agents reports will be processed.")
+ public static final ConfigurationProperty<Integer> AGENTS_REPORT_PROCESSING_PERIOD = new ConfigurationProperty<>(
+ "agents.reports.processing.period", 1);
+
+ /**
+ * Timeout in seconds before start processing of agents' reports.
+ */
+ @Markdown(description = "Timeout in seconds before start processing of agents' reports.")
+ public static final ConfigurationProperty<Integer> AGENTS_REPORT_PROCESSING_START_TIMEOUT = new ConfigurationProperty<>(
+ "agents.reports.processing.start.timeout", 5);
+
+ /**
+ * Thread pool size for agents reports processing.
+ */
+ @Markdown(description = "Thread pool size for agents reports processing.")
+ public static final ConfigurationProperty<Integer> AGENTS_REPORT_THREAD_POOL_SIZE = new ConfigurationProperty<>(
+ "agents.reports.thread.pool.size", 10);
+
+ /**
* The maximum number of threads used to extract Ambari Views when Ambari
* Server is starting up.
*/
@@ -4791,6 +4834,49 @@ public class Configuration {
}
/**
+ * @return max thread pool size for agents registration, default 10
+ */
+ public int getRegistrationThreadPoolSize() {
+ return Integer.parseInt(getProperty(REGISTRATION_THREAD_POOL_SIZE));
+ }
+
+ /**
+ * @return max cache size for spring subscription registry.
+ */
+ public int getSubscriptionRegistryCacheSize() {
+ return Integer.parseInt(getProperty(SUBSCRIPTION_REGISTRY_CACHE_MAX_SIZE));
+ }
+
+ /**
+ * @return queue size for agents in registration.
+ */
+ public int getAgentsRegistrationQueueSize() {
+ return Integer.parseInt(getProperty(AGENTS_REGISTRATION_QUEUE_SIZE));
+ }
+
+
+ /**
+ * @return period in seconds with agents reports will be processed.
+ */
+ public int getAgentsReportProcessingPeriod() {
+ return Integer.parseInt(getProperty(AGENTS_REPORT_PROCESSING_PERIOD));
+ }
+
+ /**
+ * @return timeout in seconds before start processing of agents' reports.
+ */
+ public int getAgentsReportProcessingStartTimeout() {
+ return Integer.parseInt(getProperty(AGENTS_REPORT_PROCESSING_START_TIMEOUT));
+ }
+
+ /**
+ * @return thread pool size for agents reports processing.
+ */
+ public int getAgentsReportThreadPoolSize() {
+ return Integer.parseInt(getProperty(AGENTS_REPORT_THREAD_POOL_SIZE));
+ }
+
+ /**
* @return max thread pool size for agents, default 25
*/
public int getAgentThreadPoolSize() {
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentRegisteringQueueChecker.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentRegisteringQueueChecker.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentRegisteringQueueChecker.java
new file mode 100644
index 0000000..3f37353
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentRegisteringQueueChecker.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.configuration.spring;
+
+import org.apache.ambari.server.agent.stomp.AgentsRegistrationQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.simp.SimpMessageType;
+import org.springframework.messaging.simp.stomp.StompCommand;
+import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
+import org.springframework.messaging.support.ChannelInterceptorAdapter;
+import org.springframework.messaging.support.MessageBuilder;
+
+public class AgentRegisteringQueueChecker extends ChannelInterceptorAdapter {
+ private static final Logger LOG = LoggerFactory.getLogger(AgentsRegistrationQueue.class);
+
+ @Autowired
+ private AgentsRegistrationQueue agentsRegistrationQueue;
+
+ @Override
+ public Message<?> preSend(Message<?> message, MessageChannel channel) {
+ StompHeaderAccessor headerAccessor= StompHeaderAccessor.wrap(message);
+ String sessionId = headerAccessor.getHeader("simpSessionId").toString();
+ if (SimpMessageType.CONNECT_ACK.equals(headerAccessor.getMessageType())
+ && !agentsRegistrationQueue.offer(sessionId)) {
+ StompHeaderAccessor headerAccessorError = StompHeaderAccessor.create(StompCommand.ERROR);
+ headerAccessorError.setHeader("simpSessionId", sessionId);
+ headerAccessorError.setHeader("simpConnectMessage", headerAccessor.getHeader("simpConnectMessage").toString());
+ headerAccessorError.setMessage("Connection not allowed");
+
+ return MessageBuilder.createMessage(new byte[0], headerAccessorError.getMessageHeaders());
+ } else if (SimpMessageType.DISCONNECT_ACK.equals(headerAccessor.getMessageType())) {
+ agentsRegistrationQueue.complete(sessionId);
+ }
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java
index 387bdda..516c405 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java
@@ -19,6 +19,7 @@ package org.apache.ambari.server.configuration.spring;
import org.apache.ambari.server.agent.stomp.HeartbeatController;
import org.apache.ambari.server.api.stomp.TestController;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@@ -32,10 +33,13 @@ import com.google.inject.Injector;
@Configuration
@EnableWebSocketMessageBroker
@ComponentScan(basePackageClasses = {TestController.class, HeartbeatController.class})
-@Import(RootStompConfig.class)
+@Import({RootStompConfig.class,GuiceBeansConfig.class})
public class AgentStompConfig extends AbstractWebSocketMessageBrokerConfigurer {
private org.apache.ambari.server.configuration.Configuration configuration;
+ @Autowired
+ private AgentRegisteringQueueChecker agentRegisteringQueueChecker;
+
public AgentStompConfig(Injector injector) {
configuration = injector.getInstance(org.apache.ambari.server.configuration.Configuration.class);
}
@@ -55,5 +59,6 @@ public class AgentStompConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
registration.taskExecutor().corePoolSize(configuration.getSpringMessagingThreadPoolSize());
+ registration.setInterceptors(agentRegisteringQueueChecker);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/GuiceBeansConfig.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/GuiceBeansConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/GuiceBeansConfig.java
index 50c3aba..baa9d6e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/GuiceBeansConfig.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/GuiceBeansConfig.java
@@ -17,6 +17,7 @@
*/
package org.apache.ambari.server.configuration.spring;
+import org.apache.ambari.server.agent.stomp.AgentsRegistrationQueue;
import org.apache.ambari.server.audit.AuditLogger;
import org.apache.ambari.server.security.authorization.AmbariLdapAuthenticationProvider;
import org.apache.ambari.server.security.authorization.AmbariLocalUserProvider;
@@ -89,4 +90,15 @@ public class GuiceBeansConfig {
return injector.getInstance(AmbariPamAuthenticationProvider.class);
}
+
+ @Bean
+ public AgentRegisteringQueueChecker agentRegisteringQueueChecker() {
+ return new AgentRegisteringQueueChecker();
+ }
+
+ @Bean
+ public AgentsRegistrationQueue agentsRegistrationQueue() {
+ return new AgentsRegistrationQueue(injector);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java
index 667022e..fda0607 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java
@@ -22,6 +22,7 @@ import java.util.List;
import javax.servlet.ServletContext;
+import org.apache.ambari.server.agent.stomp.AmbariSubscriptionRegistry;
import org.apache.ambari.server.api.AmbariSendToMethodReturnValueHandler;
import org.apache.ambari.server.events.listeners.requests.StateUpdateListener;
import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
@@ -30,14 +31,14 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import org.springframework.messaging.Message;
-import org.springframework.messaging.MessagingException;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.messaging.simp.annotation.support.SendToMethodReturnValueHandler;
import org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler;
+import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler;
+import org.springframework.messaging.support.ErrorMessage;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.socket.server.jetty.JettyRequestUpgradeStrategy;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;
@@ -52,8 +53,11 @@ public class RootStompConfig {
private final ServletContext servletContext;
- public RootStompConfig(ServletContext servletContext) {
+ private final org.apache.ambari.server.configuration.Configuration configuration;
+
+ public RootStompConfig(ServletContext servletContext, Injector injector) {
this.servletContext = servletContext;
+ configuration = injector.getInstance(org.apache.ambari.server.configuration.Configuration.class);
}
@Bean
@@ -69,6 +73,13 @@ public class RootStompConfig {
}
@Autowired
+ public void configureRegistryCacheSize(SimpleBrokerMessageHandler simpleBrokerMessageHandler) throws NoSuchFieldException, IllegalAccessException {
+ AmbariSubscriptionRegistry defaultSubscriptionRegistry =
+ new AmbariSubscriptionRegistry(configuration.getSubscriptionRegistryCacheSize());
+ simpleBrokerMessageHandler.setSubscriptionRegistry(defaultSubscriptionRegistry);
+ }
+
+ @Autowired
public void configureGlobal(SimpAnnotationMethodMessageHandler messageHandler) {
List<HandlerMethodReturnValueHandler> handlers = new ArrayList<>(messageHandler.getReturnValueHandlers());
List<HandlerMethodReturnValueHandler> changedHandlers = new ArrayList<>();
@@ -94,26 +105,14 @@ public class RootStompConfig {
public static class ExceptionHandlingAdvice{
private static final Logger LOG = LoggerFactory.getLogger(ExceptionHandlingAdvice.class);
-
- @MessageExceptionHandler(MessagingException.class)
+ @MessageExceptionHandler(Exception.class)
@SendToUser("/")
- public ErrorMessage handle(MessagingException e) {
-
- LOG.error("Exception caught while processing message", e);
+ public ErrorMessage handle(Exception e) {
+ //LOG.error("Exception caught while processing message: " + e.getMessage(), e);
return new ErrorMessage(e);
}
- static class ErrorMessage {
- Message<?> failedMessage;
- String exception;
-
- ErrorMessage(MessagingException e) {
- this.failedMessage = e.getFailedMessage();
- this.exception = e.getLocalizedMessage();
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
index f0f13e1..4b37d00 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
@@ -174,6 +174,9 @@ public interface AmbariManagementController {
Set<ServiceComponentHostResponse> getHostComponents(
Set<ServiceComponentHostRequest> requests) throws AmbariException;
+ Set<ServiceComponentHostResponse> getHostComponents(
+ Set<ServiceComponentHostRequest> requests, boolean statusOnly) throws AmbariException;
+
/**
* Gets the configurations identified by the given request objects.
*
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index 8769da9..817f340 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -137,6 +137,7 @@ import org.apache.ambari.server.orm.dao.ClusterDAO;
import org.apache.ambari.server.orm.dao.ExtensionDAO;
import org.apache.ambari.server.orm.dao.ExtensionLinkDAO;
import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
+import org.apache.ambari.server.orm.dao.ServiceComponentDesiredStateDAO;
import org.apache.ambari.server.orm.dao.SettingDAO;
import org.apache.ambari.server.orm.dao.StackDAO;
import org.apache.ambari.server.orm.dao.WidgetDAO;
@@ -147,6 +148,7 @@ import org.apache.ambari.server.orm.entities.HostEntity;
import org.apache.ambari.server.orm.entities.OperatingSystemEntity;
import org.apache.ambari.server.orm.entities.RepositoryEntity;
import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
+import org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntity;
import org.apache.ambari.server.orm.entities.SettingEntity;
import org.apache.ambari.server.orm.entities.WidgetEntity;
import org.apache.ambari.server.orm.entities.WidgetLayoutEntity;
@@ -360,6 +362,9 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
@Inject
private Provider<AgentConfigsHolder> m_agentConfigsHolder;
+ @Inject
+ private ServiceComponentDesiredStateDAO serviceComponentDesiredStateDAO;
+
/**
* The KerberosHelper to help setup for enabling for disabling Kerberos
*/
@@ -694,16 +699,35 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
void persistServiceComponentHosts(Set<ServiceComponentHostRequest> requests)
throws AmbariException {
Multimap<Cluster, ServiceComponentHost> schMap = ArrayListMultimap.create();
+ Map<Long, Map<String, List<String>>> serviceComponentNames = new HashMap<>();
+ Map<Long, Map<String, Map<String, ServiceComponentDesiredStateEntity>>> serviceComponentDesiredStateEntities = new HashMap<>();
for (ServiceComponentHostRequest request : requests) {
Cluster cluster = clusters.getCluster(request.getClusterName());
Service s = cluster.getService(request.getServiceName());
ServiceComponent sc = s.getServiceComponent(
request.getComponentName());
+ serviceComponentNames.computeIfAbsent(sc.getClusterId(), c -> new HashMap<>())
+ .computeIfAbsent(sc.getServiceName(), h ->new ArrayList<>()).add(sc.getName());
+ }
- ServiceComponentHost sch =
- serviceComponentHostFactory.createNew(sc, request.getHostname());
+ List<ServiceComponentDesiredStateEntity> entities = serviceComponentDesiredStateDAO.findByNames(serviceComponentNames);
+
+ for (ServiceComponentDesiredStateEntity stateEntity : entities) {
+ serviceComponentDesiredStateEntities.computeIfAbsent(stateEntity.getClusterId(), c -> new HashMap<>())
+ .computeIfAbsent(stateEntity.getServiceName(), h ->new HashMap<>())
+ .putIfAbsent(stateEntity.getComponentName(), stateEntity);
+ }
+
+ for (ServiceComponentHostRequest request : requests) {
+ Cluster cluster = clusters.getCluster(request.getClusterName());
+ Service s = cluster.getService(request.getServiceName());
+ ServiceComponent sc = s.getServiceComponent(
+ request.getComponentName());
+ ServiceComponentHost sch =
+ serviceComponentHostFactory.createNew(sc, request.getHostname(),
+ serviceComponentDesiredStateEntities.get(cluster.getClusterId()).get(s.getName()).get(sc.getName()));
if (request.getDesiredState() != null
&& !request.getDesiredState().isEmpty()) {
State state = State.valueOf(request.getDesiredState());
@@ -1205,6 +1229,11 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
private Set<ServiceComponentHostResponse> getHostComponents(
ServiceComponentHostRequest request) throws AmbariException {
+ return getHostComponents(request, false);
+ }
+
+ private Set<ServiceComponentHostResponse> getHostComponents(
+ ServiceComponentHostRequest request, boolean statusOnly) throws AmbariException {
LOG.debug("Processing request {}", request);
if (request.getClusterName() == null
@@ -1348,7 +1377,9 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
}
}
- ServiceComponentHostResponse r = sch.convertToResponse(desiredConfigs);
+ ServiceComponentHostResponse r = statusOnly ? sch.convertToResponseStatusOnly(desiredConfigs,
+ filterBasedConfigStaleness)
+ : sch.convertToResponse(desiredConfigs);
if (null == r || (filterBasedConfigStaleness && r.isStaleConfig() != staleConfig)) {
continue;
}
@@ -1404,7 +1435,9 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
}
}
- ServiceComponentHostResponse r = sch.convertToResponse(desiredConfigs);
+ ServiceComponentHostResponse r = statusOnly ? sch.convertToResponseStatusOnly(desiredConfigs,
+ filterBasedConfigStaleness)
+ : sch.convertToResponse(desiredConfigs);
if (null == r || (filterBasedConfigStaleness && r.isStaleConfig() != staleConfig)) {
continue;
}
@@ -3859,12 +3892,18 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
@Override
public Set<ServiceComponentHostResponse> getHostComponents(
Set<ServiceComponentHostRequest> requests) throws AmbariException {
+ return getHostComponents(requests, false);
+ }
+
+ @Override
+ public Set<ServiceComponentHostResponse> getHostComponents(
+ Set<ServiceComponentHostRequest> requests, boolean statusOnly) throws AmbariException {
LOG.debug("Processing requests: {}", requests);
Set<ServiceComponentHostResponse> response =
new HashSet<>();
for (ServiceComponentHostRequest request : requests) {
try {
- response.addAll(getHostComponents(request));
+ response.addAll(getHostComponents(request, statusOnly));
} catch (ServiceComponentHostNotFoundException | ServiceComponentNotFoundException | ServiceNotFoundException e) {
if (requests.size() == 1) {
// only throw exception if 1 request.
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
index 4143beb..c749846 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
@@ -134,6 +134,7 @@ import org.eclipse.jetty.server.SessionIdManager;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.handler.RequestLogHandler;
+import org.eclipse.jetty.server.handler.gzip.GzipHandler;
import org.eclipse.jetty.server.session.DefaultSessionIdManager;
import org.eclipse.jetty.server.session.SessionHandler;
import org.eclipse.jetty.servlet.DefaultServlet;
@@ -455,7 +456,14 @@ public class AmbariServer {
enableLog4jMonitor(configsMap);
- handlerList.addHandler(root);
+ GzipHandler gzipHandler = new GzipHandler();
+ gzipHandler.setHandler(root);
+
+ //TODO minimal set, perhaps is needed to add some other mime types
+ gzipHandler.setIncludedMimeTypes("text/html", "text/plain", "text/xml", "text/css", "application/javascript",
+ "application/x-javascript", "application/xml", "application/x-www-form-urlencoded", "application/json");
+ handlerList.addHandler(gzipHandler);
+
server.setHandler(handlerList);
ServletHolder agent = new ServletHolder(ServletContainer.class);
@@ -512,7 +520,8 @@ public class AmbariServer {
LOG.info("********* Initializing Clusters **********");
Clusters clusters = injector.getInstance(Clusters.class);
StringBuilder clusterDump = new StringBuilder();
- clusters.debugDump(clusterDump);
+ //TODO temporally commented because takes a lot of time on 5k cluster
+ //clusters.debugDump(clusterDump);
LOG.info("********* Current Clusters State *********");
LOG.info(clusterDump.toString());
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java
index c36e107..1fd6697 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java
@@ -541,7 +541,7 @@ public class HostResourceProvider extends AbstractControllerResourceProvider {
recoveryConfigHelper.getRecoveryConfig(clusters.getCluster(hostRequest.getClusterName()).getClusterName(),
addedHost.getHostName())
));
- hostLevelParamsUpdateEvent.setHostName(addedHost.getHostName());
+ hostLevelParamsUpdateEvent.setHostId(addedHost.getHostId());
hostLevelParamsUpdateEvents.add(hostLevelParamsUpdateEvent);
}
}
@@ -550,12 +550,12 @@ public class HostResourceProvider extends AbstractControllerResourceProvider {
// TODO add rack change to topology update
updateHostRackInfoIfChanged(clusters, hostRequests);
- TopologyUpdateEvent topologyUpdateEvent =
- new TopologyUpdateEvent(addedTopologies, TopologyUpdateEvent.EventType.UPDATE);
- topologyHolder.updateData(topologyUpdateEvent);
for (HostLevelParamsUpdateEvent hostLevelParamsUpdateEvent : hostLevelParamsUpdateEvents) {
hostLevelParamsHolder.updateData(hostLevelParamsUpdateEvent);
}
+ TopologyUpdateEvent topologyUpdateEvent =
+ new TopologyUpdateEvent(addedTopologies, TopologyUpdateEvent.EventType.UPDATE);
+ topologyHolder.updateData(topologyUpdateEvent);
}
/**
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/DefaultServiceCalculatedState.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/DefaultServiceCalculatedState.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/DefaultServiceCalculatedState.java
index 5e02a64..3bad54c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/DefaultServiceCalculatedState.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/DefaultServiceCalculatedState.java
@@ -92,7 +92,7 @@ public class DefaultServiceCalculatedState implements ServiceCalculatedState {
serviceName, null, null, null);
Set<ServiceComponentHostResponse> hostComponentResponses =
- managementControllerProvider.get().getHostComponents(Collections.singleton(request));
+ managementControllerProvider.get().getHostComponents(Collections.singleton(request), true);
State masterState = null;
State clientState = null;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/FlumeServiceCalculatedState.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/FlumeServiceCalculatedState.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/FlumeServiceCalculatedState.java
index ca4fe6e..6b22d3b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/FlumeServiceCalculatedState.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/FlumeServiceCalculatedState.java
@@ -50,7 +50,7 @@ public final class FlumeServiceCalculatedState extends DefaultServiceCalculatedS
serviceName, null, null, null);
Set<ServiceComponentHostResponse> hostComponentResponses =
- managementControllerProvider.get().getHostComponents(Collections.singleton(request));
+ managementControllerProvider.get().getHostComponents(Collections.singleton(request), true);
State state = State.UNKNOWN;
for (ServiceComponentHostResponse schr : hostComponentResponses) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HBaseServiceCalculatedState.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HBaseServiceCalculatedState.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HBaseServiceCalculatedState.java
index eac0dce..84b805a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HBaseServiceCalculatedState.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HBaseServiceCalculatedState.java
@@ -53,7 +53,7 @@ public final class HBaseServiceCalculatedState extends DefaultServiceCalculatedS
serviceName, null, null, null);
Set<ServiceComponentHostResponse> hostComponentResponses =
- managementControllerProvider.get().getHostComponents(Collections.singleton(request));
+ managementControllerProvider.get().getHostComponents(Collections.singleton(request), true);
int hBaseMasterActiveCount = 0;
State nonStartedState = null;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HDFSServiceCalculatedState.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HDFSServiceCalculatedState.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HDFSServiceCalculatedState.java
index 7bbad2a..0db8d8a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HDFSServiceCalculatedState.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HDFSServiceCalculatedState.java
@@ -53,7 +53,7 @@ public final class HDFSServiceCalculatedState extends DefaultServiceCalculatedSt
serviceName, null, null, null);
Set<ServiceComponentHostResponse> hostComponentResponses =
- managementControllerProvider.get().getHostComponents(Collections.singleton(request));
+ managementControllerProvider.get().getHostComponents(Collections.singleton(request), true);
int nameNodeCount = 0;
int nameNodeActiveCount = 0;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HiveServiceCalculatedState.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HiveServiceCalculatedState.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HiveServiceCalculatedState.java
index 77e44a5..756d19e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HiveServiceCalculatedState.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/HiveServiceCalculatedState.java
@@ -54,7 +54,7 @@ public final class HiveServiceCalculatedState extends DefaultServiceCalculatedSt
serviceName, null, null, null);
Set<ServiceComponentHostResponse> hostComponentResponses =
- managementControllerProvider.get().getHostComponents(Collections.singleton(request));
+ managementControllerProvider.get().getHostComponents(Collections.singleton(request), true);
int activeHiveMetastoreComponentCount = 0;
State nonStartedState = null;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/OozieServiceCalculatedState.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/OozieServiceCalculatedState.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/OozieServiceCalculatedState.java
index 1803f70..d43375c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/OozieServiceCalculatedState.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/OozieServiceCalculatedState.java
@@ -54,7 +54,7 @@ public final class OozieServiceCalculatedState extends DefaultServiceCalculatedS
serviceName, null, null, null);
Set<ServiceComponentHostResponse> hostComponentResponses =
- managementControllerProvider.get().getHostComponents(Collections.singleton(request));
+ managementControllerProvider.get().getHostComponents(Collections.singleton(request), true);
int oozieServerActiveCount = 0;
State nonStartedState = null;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/YARNServiceCalculatedState.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/YARNServiceCalculatedState.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/YARNServiceCalculatedState.java
index 2f1619f..36c2421 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/YARNServiceCalculatedState.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/state/YARNServiceCalculatedState.java
@@ -54,7 +54,7 @@ public final class YARNServiceCalculatedState extends DefaultServiceCalculatedSt
serviceName, null, null, null);
Set<ServiceComponentHostResponse> hostComponentResponses =
- managementControllerProvider.get().getHostComponents(Collections.singleton(request));
+ managementControllerProvider.get().getHostComponents(Collections.singleton(request), true);
int resourceManagerActiveCount = 0;
boolean isAppTimeLineServerActive = false;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/AgentConfigsUpdateEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AgentConfigsUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AgentConfigsUpdateEvent.java
index f0f17e1..89fc8bc 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/AgentConfigsUpdateEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AgentConfigsUpdateEvent.java
@@ -42,7 +42,7 @@ public class AgentConfigsUpdateEvent extends AmbariHostUpdateEvent implements Ha
/**
* Host identifier.
*/
- private String hostName;
+ private Long hostId;
/**
* Configs grouped by cluster id as keys.
@@ -64,17 +64,17 @@ public class AgentConfigsUpdateEvent extends AmbariHostUpdateEvent implements Ha
this.hash = hash;
}
- public static AgentConfigsUpdateEvent emptyUpdate() {
- return new AgentConfigsUpdateEvent(null);
+ public void setHostId(Long hostId) {
+ this.hostId = hostId;
}
- public void setHostName(String hostName) {
- this.hostName = hostName;
+ @Override
+ public Long getHostId() {
+ return hostId;
}
- @Override
- public String getHostName() {
- return hostName;
+ public static AgentConfigsUpdateEvent emptyUpdate() {
+ return new AgentConfigsUpdateEvent(null);
}
@Override
@@ -84,12 +84,12 @@ public class AgentConfigsUpdateEvent extends AmbariHostUpdateEvent implements Ha
AgentConfigsUpdateEvent that = (AgentConfigsUpdateEvent) o;
- return Objects.equals(hostName, that.hostName) &&
+ return Objects.equals(hostId, that.hostId) &&
Objects.equals(clustersConfigs, that.clustersConfigs);
}
@Override
public int hashCode() {
- return Objects.hash(hostName, clustersConfigs);
+ return Objects.hash(hostId, clustersConfigs);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionsUpdateEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionsUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionsUpdateEvent.java
index cc0b5fb..0bae130 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionsUpdateEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionsUpdateEvent.java
@@ -37,17 +37,19 @@ public class AlertDefinitionsUpdateEvent extends AmbariHostUpdateEvent implement
private final Map<Long, AlertCluster> clusters;
private final EventType eventType;
private final String hostName;
+ private final Long hostId;
private String hash;
public static AlertDefinitionsUpdateEvent emptyEvent() {
- return new AlertDefinitionsUpdateEvent(null, null, null);
+ return new AlertDefinitionsUpdateEvent(null, null, null, null);
}
- public AlertDefinitionsUpdateEvent(EventType eventType, Map<Long, AlertCluster> clusters, String hostName) {
+ public AlertDefinitionsUpdateEvent(EventType eventType, Map<Long, AlertCluster> clusters, String hostName, Long hostId) {
super(Type.ALERT_DEFINITIONS);
this.eventType = eventType;
this.clusters = clusters != null ? Collections.unmodifiableMap(clusters) : null;
this.hostName = hostName;
+ this.hostId = hostId;
}
@Override
@@ -61,7 +63,6 @@ public class AlertDefinitionsUpdateEvent extends AmbariHostUpdateEvent implement
this.hash = hash;
}
- @Override
@JsonProperty("hostName")
public String getHostName() {
return hostName;
@@ -93,6 +94,11 @@ public class AlertDefinitionsUpdateEvent extends AmbariHostUpdateEvent implement
return Objects.hash(eventType, clusters);
}
+ @Override
+ public Long getHostId() {
+ return hostId;
+ }
+
public enum EventType {
/** Full current alert definitions */
CREATE,
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariHostUpdateEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariHostUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariHostUpdateEvent.java
index 495e4c8..fe49906 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariHostUpdateEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariHostUpdateEvent.java
@@ -25,11 +25,11 @@ import java.beans.Transient;
public abstract class AmbariHostUpdateEvent extends AmbariUpdateEvent {
/**
- * Host name message will sent to.
- * @return host name.
+ * Host id message will sent to.
+ * @return host id.
*/
@Transient
- public abstract String getHostName();
+ public abstract Long getHostId();
public AmbariHostUpdateEvent(Type type) {
super(type);
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/ConfigsUpdateEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/ConfigsUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/ConfigsUpdateEvent.java
index eefca6b..af4a9af 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/ConfigsUpdateEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/ConfigsUpdateEvent.java
@@ -246,43 +246,4 @@ public class ConfigsUpdateEvent extends AmbariUpdateEvent {
return result;
}
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- ConfigsUpdateEvent that = (ConfigsUpdateEvent) o;
-
- if (serviceConfigId != null ? !serviceConfigId.equals(that.serviceConfigId) : that.serviceConfigId != null)
- return false;
- if (clusterId != null ? !clusterId.equals(that.clusterId) : that.clusterId != null) return false;
- if (serviceName != null ? !serviceName.equals(that.serviceName) : that.serviceName != null) return false;
- if (groupId != null ? !groupId.equals(that.groupId) : that.groupId != null) return false;
- if (version != null ? !version.equals(that.version) : that.version != null) return false;
- if (user != null ? !user.equals(that.user) : that.user != null) return false;
- if (note != null ? !note.equals(that.note) : that.note != null) return false;
- if (hostNames != null ? !hostNames.equals(that.hostNames) : that.hostNames != null) return false;
- if (createTime != null ? !createTime.equals(that.createTime) : that.createTime != null) return false;
- if (groupName != null ? !groupName.equals(that.groupName) : that.groupName != null) return false;
- if (configs != null ? !configs.equals(that.configs) : that.configs != null) return false;
- return changedConfigTypes != null ? changedConfigTypes.equals(that.changedConfigTypes) : that.changedConfigTypes == null;
- }
-
- @Override
- public int hashCode() {
- int result = serviceConfigId != null ? serviceConfigId.hashCode() : 0;
- result = 31 * result + (clusterId != null ? clusterId.hashCode() : 0);
- result = 31 * result + (serviceName != null ? serviceName.hashCode() : 0);
- result = 31 * result + (groupId != null ? groupId.hashCode() : 0);
- result = 31 * result + (version != null ? version.hashCode() : 0);
- result = 31 * result + (user != null ? user.hashCode() : 0);
- result = 31 * result + (note != null ? note.hashCode() : 0);
- result = 31 * result + (hostNames != null ? hostNames.hashCode() : 0);
- result = 31 * result + (createTime != null ? createTime.hashCode() : 0);
- result = 31 * result + (groupName != null ? groupName.hashCode() : 0);
- result = 31 * result + (configs != null ? configs.hashCode() : 0);
- result = 31 * result + (changedConfigTypes != null ? changedConfigTypes.hashCode() : 0);
- return result;
- }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/ExecutionCommandEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/ExecutionCommandEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/ExecutionCommandEvent.java
index 8f8b5a0..c97ed60 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/ExecutionCommandEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/ExecutionCommandEvent.java
@@ -31,9 +31,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class ExecutionCommandEvent extends AmbariHostUpdateEvent {
/**
- * Host name with agent execution commands will be send to.
+ * Host id with agent execution commands will be send to.
*/
- private String hostName;
+ private Long hostId;
/**
* Execution commands grouped by cluster id.
@@ -54,15 +54,6 @@ public class ExecutionCommandEvent extends AmbariHostUpdateEvent {
this.clusters = clusters;
}
- public void setHostName(String hostName) {
- this.hostName = hostName;
- }
-
- @Override
- public String getHostName() {
- return hostName;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -70,14 +61,23 @@ public class ExecutionCommandEvent extends AmbariHostUpdateEvent {
ExecutionCommandEvent that = (ExecutionCommandEvent) o;
- if (hostName != null ? !hostName.equals(that.hostName) : that.hostName != null) return false;
+ if (hostId != null ? !hostId.equals(that.hostId) : that.hostId != null) return false;
return clusters != null ? clusters.equals(that.clusters) : that.clusters == null;
}
@Override
public int hashCode() {
- int result = hostName != null ? hostName.hashCode() : 0;
+ int result = hostId != null ? hostId.hashCode() : 0;
result = 31 * result + (clusters != null ? clusters.hashCode() : 0);
return result;
}
+
+ public void setHostId(Long hostId) {
+ this.hostId = hostId;
+ }
+
+ @Override
+ public Long getHostId() {
+ return hostId;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/HostLevelParamsUpdateEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/HostLevelParamsUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/HostLevelParamsUpdateEvent.java
index d68e802..8948391 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/HostLevelParamsUpdateEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/HostLevelParamsUpdateEvent.java
@@ -42,7 +42,7 @@ public class HostLevelParamsUpdateEvent extends AmbariHostUpdateEvent implements
/**
* Host identifier.
*/
- private String hostName;
+ private Long hostId;
/**
* Host level parameters by clusters.
@@ -73,13 +73,13 @@ public class HostLevelParamsUpdateEvent extends AmbariHostUpdateEvent implements
return new HostLevelParamsUpdateEvent(null);
}
- public void setHostName(String hostName) {
- this.hostName = hostName;
+ public void setHostId(Long hostId) {
+ this.hostId = hostId;
}
@Override
- public String getHostName() {
- return hostName;
+ public Long getHostId() {
+ return hostId;
}
@Override
@@ -89,12 +89,12 @@ public class HostLevelParamsUpdateEvent extends AmbariHostUpdateEvent implements
HostLevelParamsUpdateEvent that = (HostLevelParamsUpdateEvent) o;
- return Objects.equals(hostName, that.hostName) &&
+ return Objects.equals(hostId, that.hostId) &&
Objects.equals(hostLevelParamsClusters, that.hostLevelParamsClusters);
}
@Override
public int hashCode() {
- return Objects.hash(hostName, hostLevelParamsClusters);
+ return Objects.hash(hostId, hostLevelParamsClusters);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java
index d39c4ae..a458ea3 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java
@@ -90,17 +90,13 @@ public class ServiceUpdateEvent extends AmbariUpdateEvent {
ServiceUpdateEvent that = (ServiceUpdateEvent) o;
if (clusterName != null ? !clusterName.equals(that.clusterName) : that.clusterName != null) return false;
- if (maintenanceState != that.maintenanceState) return false;
- if (serviceName != null ? !serviceName.equals(that.serviceName) : that.serviceName != null) return false;
- return state == that.state;
+ return serviceName != null ? serviceName.equals(that.serviceName) : that.serviceName == null;
}
@Override
public int hashCode() {
int result = clusterName != null ? clusterName.hashCode() : 0;
- result = 31 * result + (maintenanceState != null ? maintenanceState.hashCode() : 0);
result = 31 * result + (serviceName != null ? serviceName.hashCode() : 0);
- result = 31 * result + (state != null ? state.hashCode() : 0);
return result;
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java
index 548ea41..17e8c3d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java
@@ -57,9 +57,9 @@ public class StateUpdateListener {
String destination = event.getDestination();
if (event instanceof AmbariHostUpdateEvent) {
AmbariHostUpdateEvent hostUpdateEvent = (AmbariHostUpdateEvent) event;
- String hostName = hostUpdateEvent.getHostName();
- String sessionId = agentSessionManager.getSessionId(hostName);
- LOG.debug("Received status update event {} for host {} registered with session ID {}", hostUpdateEvent, hostName, sessionId);
+ Long hostId = hostUpdateEvent.getHostId();
+ String sessionId = agentSessionManager.getSessionId(hostId);
+ LOG.debug("Received status update event {} for host {} registered with session ID {}", hostUpdateEvent, hostId, sessionId);
MessageHeaders headers = createHeaders(sessionId);
simpMessagingTemplate.convertAndSendToUser(sessionId, destination, event, headers);
} else {
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
index 24c8166..50dc144 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
@@ -18,7 +18,9 @@
package org.apache.ambari.server.events.listeners.services;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.EagerSingleton;
@@ -63,23 +65,26 @@ public class ServiceUpdateListener {
@Subscribe
public void onHostComponentUpdate(HostComponentsUpdateEvent event) throws AmbariException {
+ Map<Long, Set<String>> clustersServices = new HashMap<>();
for (HostComponentUpdate hostComponentUpdate : event.getHostComponentUpdates()) {
Long clusterId = hostComponentUpdate.getClusterId();
- String clusterName = m_clusters.get().getClusterById(clusterId).getClusterName();
String serviceName = hostComponentUpdate.getServiceName();
+ clustersServices.computeIfAbsent(clusterId, c -> new HashSet<>()).add(serviceName);
+ }
+ for (Map.Entry<Long, Set<String>> clusterServices : clustersServices.entrySet()) {
+ Long clusterId = clusterServices.getKey();
+ String clusterName = m_clusters.get().getClusterById(clusterId).getClusterName();
+ for (String serviceName : clusterServices.getValue()) {
+ ServiceCalculatedState serviceCalculatedState = ServiceCalculatedStateFactory.getServiceStateProvider(serviceName);
+ State serviceState = serviceCalculatedState.getState(clusterName, serviceName);
- ServiceCalculatedState serviceCalculatedState = ServiceCalculatedStateFactory.getServiceStateProvider(serviceName);
- State serviceState = serviceCalculatedState.getState(clusterName, serviceName);
-
- // retrieve state from cache
- if (states.containsKey(clusterId) && states.get(clusterId).containsKey(serviceName) && states.get(clusterId).get(serviceName).equals(serviceState)) {
- continue;
- }
- if (!states.containsKey(clusterId)) {
- states.put(clusterId, new HashMap<>());
+ // retrieve state from cache
+ if (states.containsKey(clusterId) && states.get(clusterId).containsKey(serviceName) && states.get(clusterId).get(serviceName).equals(serviceState)) {
+ continue;
+ }
+ states.computeIfAbsent(clusterId, c -> new HashMap<>()).put(serviceName, serviceState);
+ stateUpdateEventPublisher.publish(new ServiceUpdateEvent(clusterName, null, serviceName, serviceState));
}
- states.get(clusterId).put(serviceName, serviceState);
- stateUpdateEventPublisher.publish(new ServiceUpdateEvent(clusterName, null, serviceName, serviceState));
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
index 1dd8a5b..58bfcfc 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
@@ -42,17 +42,12 @@ import org.apache.ambari.server.events.TaskCreateEvent;
import org.apache.ambari.server.events.TaskUpdateEvent;
import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher;
import org.apache.ambari.server.events.publishers.TaskEventPublisher;
-import org.apache.ambari.server.orm.dao.ClusterDAO;
-import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
import org.apache.ambari.server.orm.dao.RequestDAO;
import org.apache.ambari.server.orm.dao.StageDAO;
-import org.apache.ambari.server.orm.entities.ClusterEntity;
-import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
import org.apache.ambari.server.orm.entities.RequestEntity;
import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity;
import org.apache.ambari.server.orm.entities.StageEntity;
import org.apache.ambari.server.orm.entities.StageEntityPK;
-import org.apache.ambari.server.topology.TopologyManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,26 +95,14 @@ public class TaskStatusListener {
private RequestDAO requestDAO;
- private HostRoleCommandDAO hostRoleCommandDAO;
-
- private TopologyManager topologyManager;
-
private StateUpdateEventPublisher stateUpdateEventPublisher;
- private ClusterDAO clusterDAO;
-
@Inject
public TaskStatusListener(TaskEventPublisher taskEventPublisher, StageDAO stageDAO, RequestDAO requestDAO,
- StateUpdateEventPublisher stateUpdateEventPublisher,
- HostRoleCommandDAO hostRoleCommandDAO,
- TopologyManager topologyManager,
- ClusterDAO clusterDAO) {
+ StateUpdateEventPublisher stateUpdateEventPublisher) {
this.stageDAO = stageDAO;
this.requestDAO = requestDAO;
this.stateUpdateEventPublisher = stateUpdateEventPublisher;
- this.hostRoleCommandDAO = hostRoleCommandDAO;
- this.topologyManager = topologyManager;
- this.clusterDAO = clusterDAO;
taskEventPublisher.register(this);
}
@@ -307,23 +290,13 @@ public class TaskStatusListener {
* @param requestIdsWithReceivedTaskStatus set of request ids that has received tasks status
* @param stagesWithChangedTaskStatus set of stages that have received tasks with changed status
*/
- private void updateActiveRequestsStatus(final Set<Long> requestIdsWithReceivedTaskStatus, Set<StageEntityPK> stagesWithChangedTaskStatus) throws ClusterNotFoundException {
+ private void updateActiveRequestsStatus(final Set<Long> requestIdsWithReceivedTaskStatus, Set<StageEntityPK> stagesWithChangedTaskStatus) {
for (Long reportedRequestId : requestIdsWithReceivedTaskStatus) {
if (activeRequestMap.containsKey(reportedRequestId)) {
ActiveRequest request = activeRequestMap.get(reportedRequestId);
Boolean didStatusChange = updateRequestStatus(reportedRequestId, stagesWithChangedTaskStatus);
if (didStatusChange) {
- RequestEntity updated = requestDAO.updateStatus(reportedRequestId, request.getStatus(), request.getDisplayStatus());
- ClusterEntity clusterEntity = clusterDAO.findById(updated.getClusterId());
- if (clusterEntity == null) {
- throw new ClusterNotFoundException(updated.getClusterId());
- }
- List<HostRoleCommandEntity> hostRoleCommandEntities = hostRoleCommandDAO.findByRequest(updated.getRequestId());
- stateUpdateEventPublisher.publish(new RequestUpdateEvent(updated,
- hostRoleCommandDAO,
- topologyManager,
- clusterEntity.getClusterName(),
- hostRoleCommandEntities));
+ requestDAO.updateStatus(reportedRequestId, request.getStatus(), request.getDisplayStatus());
}
if (request.isCompleted() && isAllTasksCompleted(reportedRequestId)) {
// Request is considered ton have been finished if request status and all of it's tasks status are completed
http://git-wip-us.apache.org/repos/asf/ambari/blob/6ecac18c/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java
index e09d5ca..bacdc2d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java
@@ -70,31 +70,31 @@ public class AgentCommandsPublisher {
@Inject
private StateUpdateEventPublisher stateUpdateEventPublisher;
- public void sendAgentCommand(Multimap<String, AgentCommand> agentCommands) throws AmbariException {
+ public void sendAgentCommand(Multimap<Long, AgentCommand> agentCommands) throws AmbariException {
if (agentCommands != null && !agentCommands.isEmpty()) {
- Map<String, TreeMap<String, ExecutionCommandsCluster>> executionCommandsClusters = new TreeMap<>();
- for (Map.Entry<String, AgentCommand> acHostEntry : agentCommands.entries()) {
- String hostName = acHostEntry.getKey();
+ Map<Long, TreeMap<String, ExecutionCommandsCluster>> executionCommandsClusters = new TreeMap<>();
+ for (Map.Entry<Long, AgentCommand> acHostEntry : agentCommands.entries()) {
+ Long hostId = acHostEntry.getKey();
AgentCommand ac = acHostEntry.getValue();
- populateExecutionCommandsClusters(executionCommandsClusters, hostName, ac);
+ populateExecutionCommandsClusters(executionCommandsClusters, hostId, ac);
}
- for (Map.Entry<String, TreeMap<String, ExecutionCommandsCluster>> hostEntry : executionCommandsClusters.entrySet()) {
- String hostName = hostEntry.getKey();
+ for (Map.Entry<Long, TreeMap<String, ExecutionCommandsCluster>> hostEntry : executionCommandsClusters.entrySet()) {
+ Long hostId = hostEntry.getKey();
ExecutionCommandEvent executionCommandEvent = new ExecutionCommandEvent(hostEntry.getValue());
- executionCommandEvent.setHostName(hostName);
+ executionCommandEvent.setHostId(hostId);
stateUpdateEventPublisher.publish(executionCommandEvent);
}
}
}
- public void sendAgentCommand(String hostName, AgentCommand agentCommand) throws AmbariException {
- Multimap<String, AgentCommand> agentCommands = ArrayListMultimap.create();
- agentCommands.put(hostName, agentCommand);
+ public void sendAgentCommand(Long hostId, AgentCommand agentCommand) throws AmbariException {
+ Multimap<Long, AgentCommand> agentCommands = ArrayListMultimap.create();
+ agentCommands.put(hostId, agentCommand);
sendAgentCommand(agentCommands);
}
- private void populateExecutionCommandsClusters(Map<String, TreeMap<String, ExecutionCommandsCluster>> executionCommandsClusters,
- String hostName, AgentCommand ac) throws AmbariException {
+ private void populateExecutionCommandsClusters(Map<Long, TreeMap<String, ExecutionCommandsCluster>> executionCommandsClusters,
+ Long hostId, AgentCommand ac) throws AmbariException {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending command string = " + StageUtils.jaxbToString(ac));
@@ -114,7 +114,7 @@ public class AgentCommandsPublisher {
if ("SET_KEYTAB".equalsIgnoreCase(customCommand) || "REMOVE_KEYTAB".equalsIgnoreCase(customCommand)) {
LOG.info(String.format("%s called", customCommand));
try {
- injectKeytab(ec, customCommand, hostName);
+ injectKeytab(ec, customCommand, clusters.getHostById(hostId).getHostName());
} catch (IOException e) {
throw new AmbariException("Could not inject keytab into command", e);
}
@@ -126,15 +126,15 @@ public class AgentCommandsPublisher {
clusterId = Long.toString(clusters.getCluster(clusterName).getClusterId());
}
ec.setClusterId(clusterId);
- prepareExecutionCommandsClusters(executionCommandsClusters, hostName, clusterId);
- executionCommandsClusters.get(hostName).get(clusterId).getExecutionCommands().add((ExecutionCommand) ac);
+ prepareExecutionCommandsClusters(executionCommandsClusters, hostId, clusterId);
+ executionCommandsClusters.get(hostId).get(clusterId).getExecutionCommands().add((ExecutionCommand) ac);
break;
}
case CANCEL_COMMAND: {
CancelCommand cc = (CancelCommand) ac;
String clusterId = Long.toString(hostRoleCommandDAO.findByPK(cc.getTargetTaskId()).getStage().getClusterId());
- prepareExecutionCommandsClusters(executionCommandsClusters, hostName, clusterId);
- executionCommandsClusters.get(hostName).get(clusterId).getCancelCommands().add(cc);
+ prepareExecutionCommandsClusters(executionCommandsClusters, hostId, clusterId);
+ executionCommandsClusters.get(hostId).get(clusterId).getCancelCommands().add(cc);
break;
}
default:
@@ -143,13 +143,13 @@ public class AgentCommandsPublisher {
}
}
- private void prepareExecutionCommandsClusters(Map<String, TreeMap<String, ExecutionCommandsCluster>> executionCommandsClusters,
- String hostName, String clusterId) {
- if (!executionCommandsClusters.containsKey(hostName)) {
- executionCommandsClusters.put(hostName, new TreeMap<>());
+ private void prepareExecutionCommandsClusters(Map<Long, TreeMap<String, ExecutionCommandsCluster>> executionCommandsClusters,
+ Long hostId, String clusterId) {
+ if (!executionCommandsClusters.containsKey(hostId)) {
+ executionCommandsClusters.put(hostId, new TreeMap<>());
}
- if (!executionCommandsClusters.get(hostName).containsKey(clusterId)) {
- executionCommandsClusters.get(hostName).put(clusterId, new ExecutionCommandsCluster(new ArrayList<>(),
+ if (!executionCommandsClusters.get(hostId).containsKey(clusterId)) {
+ executionCommandsClusters.get(hostId).put(clusterId, new ExecutionCommandsCluster(new ArrayList<>(),
new ArrayList<>()));
}
}