You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2018/05/18 18:20:57 UTC
[ambari] branch trunk updated: AMBARI-23898. Server posts numerous
redundant metadata updates to agent. (#1325)
This is an automated email from the ASF dual-hosted git repository.
mpapirkovskyy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new e4bc80a AMBARI-23898. Server posts numerous redundant metadata updates to agent. (#1325)
e4bc80a is described below
commit e4bc80afe44ba067a460acd32e02728244607b14
Author: Myroslav Papirkovskyi <mp...@apache.org>
AuthorDate: Fri May 18 21:20:54 2018 +0300
AMBARI-23898. Server posts numerous redundant metadata updates to agent. (#1325)
* AMBARI-23898. Server posts numerous redundant metadata updates to agent. (mpapirkovskyy)
* AMBARI-23898. Server posts numerous redundant metadata updates to agent. (mpapirkovskyy)
---
.../apache/ambari/server/agent/RecoveryConfig.java | 28 +++++++++++++++++
.../server/agent/stomp/HostLevelParamsHolder.java | 35 ++++++++++++++++++++--
.../ambari/server/agent/stomp/MetadataHolder.java | 18 ++++-------
.../server/agent/stomp/dto/HostRepositories.java | 16 +++++-----
.../server/agent/stomp/dto/MetadataCluster.java | 29 ++++++++++++++++++
.../ambari/server/configuration/Configuration.java | 4 +--
.../controller/AmbariManagementControllerImpl.java | 31 +++----------------
.../server/events/HostLevelParamsUpdateEvent.java | 4 +++
8 files changed, 112 insertions(+), 53 deletions(-)
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java
index 8f909cd..8e2078d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java
@@ -105,6 +105,34 @@ public class RecoveryConfig {
}
@Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ RecoveryConfig that = (RecoveryConfig) o;
+
+ if (type != null ? !type.equals(that.type) : that.type != null) return false;
+ if (maxCount != null ? !maxCount.equals(that.maxCount) : that.maxCount != null) return false;
+ if (windowInMinutes != null ? !windowInMinutes.equals(that.windowInMinutes) : that.windowInMinutes != null)
+ return false;
+ if (retryGap != null ? !retryGap.equals(that.retryGap) : that.retryGap != null) return false;
+ if (maxLifetimeCount != null ? !maxLifetimeCount.equals(that.maxLifetimeCount) : that.maxLifetimeCount != null)
+ return false;
+ return enabledComponents != null ? enabledComponents.equals(that.enabledComponents) : that.enabledComponents == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = type != null ? type.hashCode() : 0;
+ result = 31 * result + (maxCount != null ? maxCount.hashCode() : 0);
+ result = 31 * result + (windowInMinutes != null ? windowInMinutes.hashCode() : 0);
+ result = 31 * result + (retryGap != null ? retryGap.hashCode() : 0);
+ result = 31 * result + (maxLifetimeCount != null ? maxLifetimeCount.hashCode() : 0);
+ result = 31 * result + (enabledComponents != null ? enabledComponents.hashCode() : 0);
+ return result;
+ }
+
+ @Override
public String toString() {
StringBuilder buffer = new StringBuilder("RecoveryConfig{");
buffer.append(", type=").append(type);
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java
index abd78c8..fbd26dd 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java
@@ -18,6 +18,7 @@
package org.apache.ambari.server.agent.stomp;
import java.util.Collection;
+import java.util.Map;
import java.util.TreeMap;
import org.apache.ambari.server.AmbariException;
@@ -30,6 +31,7 @@ import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.Host;
+import org.apache.commons.collections.MapUtils;
import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
@@ -78,9 +80,36 @@ public class HostLevelParamsHolder extends AgentHostDataHolder<HostLevelParamsUp
}
protected boolean handleUpdate(HostLevelParamsUpdateEvent update) throws AmbariException {
- //TODO implement update host level params process
- setData(update, update.getHostId());
- return true;
+ boolean changed = false;
+ if (MapUtils.isNotEmpty(update.getHostLevelParamsClusters())) {
+ Long hostId = update.getHostId();
+ for (Map.Entry<String, HostLevelParamsCluster> hostLevelParamsClusterEntry : update.getHostLevelParamsClusters().entrySet()) {
+ HostLevelParamsCluster updatedCluster = hostLevelParamsClusterEntry.getValue();
+ String clusterId = hostLevelParamsClusterEntry.getKey();
+ Map<String, HostLevelParamsCluster> clusters = getData().get(hostId).getHostLevelParamsClusters();
+ if (clusters.containsKey(clusterId)) {
+ HostLevelParamsCluster cluster = clusters.get(clusterId);
+ if (!cluster.getRecoveryConfig().equals(updatedCluster.getRecoveryConfig())) {
+ cluster.setRecoveryConfig(updatedCluster.getRecoveryConfig());
+ changed = true;
+ }
+ if (!cluster.getHostRepositories().getRepositories()
+ .equals(updatedCluster.getHostRepositories().getRepositories())) {
+ cluster.getHostRepositories().setRepositories(updatedCluster.getHostRepositories().getRepositories());
+ changed = true;
+ }
+ if (!cluster.getHostRepositories().getComponentRepos()
+ .equals(updatedCluster.getHostRepositories().getComponentRepos())) {
+ cluster.getHostRepositories().setComponentRepos(updatedCluster.getHostRepositories().getComponentRepos());
+ changed = true;
+ }
+ } else {
+ clusters.put(clusterId, updatedCluster);
+ changed = true;
+ }
+ }
+ }
+ return changed;
}
@Override
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/MetadataHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/MetadataHolder.java
index b3558cd..3d8ee35 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/MetadataHolder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/MetadataHolder.java
@@ -26,7 +26,6 @@ import org.apache.ambari.server.agent.stomp.dto.MetadataCluster;
import org.apache.ambari.server.controller.AmbariManagementControllerImpl;
import org.apache.ambari.server.events.AmbariPropertiesChangedEvent;
import org.apache.ambari.server.events.ClusterComponentsRepoChangedEvent;
-import org.apache.ambari.server.events.ClusterConfigChangedEvent;
import org.apache.ambari.server.events.MetadataUpdateEvent;
import org.apache.ambari.server.events.ServiceCredentialStoreUpdateEvent;
import org.apache.ambari.server.events.ServiceInstalledEvent;
@@ -34,6 +33,7 @@ import org.apache.ambari.server.events.UpdateEventType;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import com.google.common.eventbus.Subscribe;
@@ -85,15 +85,14 @@ public class MetadataHolder extends AgentClusterDataHolder<MetadataUpdateEvent>
changed = true;
} else {
MetadataCluster cluster = clusters.get(clusterId);
- if (!cluster.getClusterLevelParams().equals(updatedCluster.getClusterLevelParams())) {
- cluster.getClusterLevelParams().putAll(updatedCluster.getClusterLevelParams());
+ if (cluster.updateClusterLevelParams(updatedCluster.getClusterLevelParams())) {
changed = true;
}
- if (!cluster.getServiceLevelParams().equals(updatedCluster.getServiceLevelParams())) {
- cluster.getServiceLevelParams().putAll(updatedCluster.getServiceLevelParams());
+ if (cluster.updateServiceLevelParams(updatedCluster.getServiceLevelParams())) {
changed = true;
}
- if (!cluster.getStatusCommandsToRun().equals(updatedCluster.getStatusCommandsToRun())) {
+ if (CollectionUtils.isNotEmpty(updatedCluster.getStatusCommandsToRun())
+ && !cluster.getStatusCommandsToRun().containsAll(updatedCluster.getStatusCommandsToRun())) {
cluster.getStatusCommandsToRun().addAll(updatedCluster.getStatusCommandsToRun());
changed = true;
}
@@ -117,13 +116,6 @@ public class MetadataHolder extends AgentClusterDataHolder<MetadataUpdateEvent>
}
@Subscribe
- public void onConfigsChange(ClusterConfigChangedEvent configChangedEvent) throws AmbariException {
- Cluster cluster = m_clusters.get().getCluster(configChangedEvent.getClusterName());
- updateData(ambariManagementController.getClusterMetadataOnConfigsUpdate(cluster));
-
- }
-
- @Subscribe
public void onServiceCreate(ServiceInstalledEvent serviceInstalledEvent) throws AmbariException {
Cluster cluster = m_clusters.get().getCluster(serviceInstalledEvent.getClusterId());
updateData(ambariManagementController.getClusterMetadataOnServiceInstall(cluster, serviceInstalledEvent.getServiceName()));
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostRepositories.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostRepositories.java
index 37d1146..1e63812 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostRepositories.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostRepositories.java
@@ -19,7 +19,7 @@
package org.apache.ambari.server.agent.stomp.dto;
-import java.util.Map;
+import java.util.SortedMap;
import org.apache.ambari.server.agent.CommandRepository;
@@ -30,29 +30,29 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class HostRepositories {
@JsonProperty("commandRepos")
- private Map<Long, CommandRepository> repositories;
+ private SortedMap<Long, CommandRepository> repositories;
@JsonProperty("componentRepos")
- private Map<String, Long> componentRepos;
+ private SortedMap<String, Long> componentRepos;
- public HostRepositories(Map<Long, CommandRepository> repositories, Map<String, Long> componentRepos) {
+ public HostRepositories(SortedMap<Long, CommandRepository> repositories, SortedMap<String, Long> componentRepos) {
this.repositories = repositories;
this.componentRepos = componentRepos;
}
- public Map<Long, CommandRepository> getRepositories() {
+ public SortedMap<Long, CommandRepository> getRepositories() {
return repositories;
}
- public void setRepositories(Map<Long, CommandRepository> repositories) {
+ public void setRepositories(SortedMap<Long, CommandRepository> repositories) {
this.repositories = repositories;
}
- public Map<String, Long> getComponentRepos() {
+ public SortedMap<String, Long> getComponentRepos() {
return componentRepos;
}
- public void setComponentRepos(Map<String, Long> componentRepos) {
+ public void setComponentRepos(SortedMap<String, Long> componentRepos) {
this.componentRepos = componentRepos;
}
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataCluster.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataCluster.java
index bb3604e..b22ee60 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataCluster.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/MetadataCluster.java
@@ -24,6 +24,7 @@ import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.ambari.server.state.SecurityType;
+import org.apache.commons.lang.StringUtils;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -78,6 +79,34 @@ public class MetadataCluster {
this.agentConfigs = agentConfigs;
}
+ public boolean updateServiceLevelParams(SortedMap<String, MetadataServiceInfo> update) {
+ boolean changed = false;
+ for (String key : update.keySet()) {
+ if (!serviceLevelParams.containsKey(key) || !serviceLevelParams.get(key).equals(update.get(key))) {
+ changed = true;
+ break;
+ }
+ }
+ if (changed) {
+ serviceLevelParams.putAll(update);
+ }
+ return changed;
+ }
+
+ public boolean updateClusterLevelParams(SortedMap<String, String> update) {
+ boolean changed = false;
+ for (String key : update.keySet()) {
+ if (!clusterLevelParams.containsKey(key) || !StringUtils.equals(clusterLevelParams.get(key), update.get(key))) {
+ changed = true;
+ break;
+ }
+ }
+ if (changed) {
+ clusterLevelParams.putAll(update);
+ }
+ return changed;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index badb79e..91eafe5 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -1802,7 +1802,7 @@ public class Configuration {
*/
@Markdown(description = "Thread pool size for spring messaging")
public static final ConfigurationProperty<Integer> MESSAGING_THREAD_POOL_SIZE = new ConfigurationProperty<>(
- "messaging.threadpool.size", 1);
+ "messaging.threadpool.size", 10);
/**
* The thread pool size for agents registration.
@@ -4512,7 +4512,7 @@ public class Configuration {
}
/**
- * @return max thread pool size for clients, default 25
+ * @return max thread pool size for clients, default 10
*/
public int getSpringMessagingThreadPoolSize() {
return Integer.parseInt(getProperty(MESSAGING_THREAD_POOL_SIZE));
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index 0c22b42..64360d7 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -5779,31 +5779,8 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
clusterLevelParams.put(STACK_NAME, stackId.getStackName());
clusterLevelParams.put(STACK_VERSION, stackId.getStackVersion());
- Map<String, DesiredConfig> desiredConfigs = cluster.getDesiredConfigs();
- if (MapUtils.isNotEmpty(desiredConfigs)) {
-
- Set<String> userSet = configHelper.getPropertyValuesWithPropertyType(stackId, PropertyType.USER, cluster, desiredConfigs);
- String userList = gson.toJson(userSet);
- clusterLevelParams.put(USER_LIST, userList);
-
- //Create a user_group mapping and send it as part of the hostLevelParams
- Map<String, Set<String>> userGroupsMap = configHelper.createUserGroupsMap(
- stackId, cluster, desiredConfigs);
- String userGroups = gson.toJson(userGroupsMap);
- clusterLevelParams.put(USER_GROUPS, userGroups);
-
- Set<String> groupSet = configHelper.getPropertyValuesWithPropertyType(stackId, PropertyType.GROUP, cluster, desiredConfigs);
- String groupList = gson.toJson(groupSet);
- clusterLevelParams.put(GROUP_LIST, groupList);
- }
- Set<String> notManagedHdfsPathSet = configHelper.getPropertyValuesWithPropertyType(stackId,
- PropertyType.NOT_MANAGED_HDFS_PATH, cluster, desiredConfigs);
- String notManagedHdfsPathList = gson.toJson(notManagedHdfsPathSet);
- clusterLevelParams.put(NOT_MANAGED_HDFS_PATH_LIST, notManagedHdfsPathList);
-
+ clusterLevelParams.putAll(getMetadataClusterLevelConfigsParams(cluster, stackId));
clusterLevelParams.put(CLUSTER_NAME, cluster.getClusterName());
-
- StackInfo stackInfo = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion());
clusterLevelParams.put(HOOKS_FOLDER, configs.getProperty(Configuration.HOOKS_FOLDER));
return clusterLevelParams;
@@ -5812,7 +5789,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
public TreeMap<String, String> getMetadataClusterLevelConfigsParams(Cluster cluster, StackId stackId) throws AmbariException {
TreeMap<String, String> clusterLevelParams = new TreeMap<>();
- Map<String, DesiredConfig> desiredConfigs = cluster.getDesiredConfigs();
+ Map<String, DesiredConfig> desiredConfigs = cluster.getDesiredConfigs(false);
if (MapUtils.isNotEmpty(desiredConfigs)) {
Set<String> userSet = configHelper.getPropertyValuesWithPropertyType(stackId, PropertyType.USER, cluster, desiredConfigs);
@@ -5928,8 +5905,8 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
@Override
public HostRepositories retrieveHostRepositories(Cluster cluster, Host host) throws AmbariException {
List<ServiceComponentHost> hostComponents = cluster.getServiceComponentHosts(host.getHostName());
- Map<Long, CommandRepository> hostRepositories = new HashMap<>();
- Map<String, Long> componentsRepos = new HashMap<>();
+ SortedMap<Long, CommandRepository> hostRepositories = new TreeMap<>();
+ SortedMap<String, Long> componentsRepos = new TreeMap<>();
for (ServiceComponentHost serviceComponentHost : hostComponents) {
CommandRepository commandRepository;
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/HostLevelParamsUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/HostLevelParamsUpdateEvent.java
index d2fc257..facce46 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/HostLevelParamsUpdateEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/HostLevelParamsUpdateEvent.java
@@ -82,6 +82,10 @@ public class HostLevelParamsUpdateEvent extends STOMPHostEvent implements Hashab
return hostId;
}
+ public Map<String, HostLevelParamsCluster> getHostLevelParamsClusters() {
+ return hostLevelParamsClusters;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
--
To stop receiving notification emails like this one, please contact
mpapirkovskyy@apache.org.