You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2017/10/18 16:01:22 UTC

[1/3] ambari git commit: AMBARI-22261. Update format of server-agent /agents/host_level_params response according to changed repo_info mapping. (mpapirkovskyy)

Repository: ambari
Updated Branches:
  refs/heads/branch-3.0-perf dddce4dd3 -> f3e98bf7d


AMBARI-22261. Update format of server-agent /agents/host_level_params response according to changed repo_info mapping. (mpapirkovskyy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/80354430
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/80354430
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/80354430

Branch: refs/heads/branch-3.0-perf
Commit: 80354430213e6a55e2a3aa88aec4593f3fe83691
Parents: dddce4d
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Thu Oct 12 18:04:24 2017 +0300
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Wed Oct 18 13:56:26 2017 +0300

----------------------------------------------------------------------
 .../ambari/server/agent/CommandRepository.java  | 29 +++++++++++++++
 .../agent/stomp/HostLevelParamsHolder.java      | 11 +++---
 .../agent/stomp/dto/HostLevelParamsCluster.java | 20 +++++-----
 .../agent/stomp/dto/HostRepositories.java       | 39 ++++++++++++++++++++
 .../controller/AmbariManagementController.java  |  3 ++
 .../AmbariManagementControllerImpl.java         | 26 +++++++++++++
 .../internal/HostResourceProvider.java          |  5 +--
 .../ambari/server/state/ConfigHelper.java       |  1 +
 8 files changed, 117 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/80354430/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandRepository.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandRepository.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandRepository.java
index 301f475..aea8ff5 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandRepository.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandRepository.java
@@ -25,6 +25,7 @@ import org.apache.ambari.server.orm.entities.RepositoryEntity;
 import org.apache.ambari.server.state.RepositoryInfo;
 import org.apache.commons.lang.builder.ToStringBuilder;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.gson.annotations.SerializedName;
 
 /**
@@ -34,15 +35,19 @@ import com.google.gson.annotations.SerializedName;
 public class CommandRepository {
 
   @SerializedName("repositories")
+  @JsonProperty("repositories")
   private List<Repository> m_repositories = new ArrayList<>();
 
   @SerializedName("repoVersion")
+  @JsonProperty("repoVersion")
   private String m_repoVersion;
 
   @SerializedName("repoVersionId")
+  @JsonProperty("repoVersionId")
   private long m_repoVersionId;
 
   @SerializedName("stackName")
+  @JsonProperty("stackName")
   private String m_stackName;
 
   /**
@@ -125,6 +130,10 @@ public class CommandRepository {
     }
   }
 
+  public long getM_repoVersionId() {
+    return m_repoVersionId;
+  }
+
   /**
    * Gets whether this repository has been marked as having its version
    * resolved.
@@ -153,16 +162,20 @@ public class CommandRepository {
   public static class Repository {
 
     @SerializedName("baseUrl")
+    @JsonProperty("baseUrl")
     private String m_baseUrl;
 
     @SerializedName("repoId")
+    @JsonProperty("repoId")
     private String m_repoId;
 
     @SerializedName("ambariManaged")
+    @JsonProperty("ambariManaged")
     private boolean m_ambariManaged = true;
 
 
     @SerializedName("repoName")
+    @JsonProperty("repoName")
     private final String m_repoName;
 
     @SerializedName("distribution")
@@ -172,6 +185,7 @@ public class CommandRepository {
     private final String m_components;
 
     @SerializedName("mirrorsList")
+    @JsonProperty("mirrorsList")
     private String m_mirrorsList;
 
     private transient String m_osType;
@@ -244,4 +258,19 @@ public class CommandRepository {
     }
 
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    CommandRepository that = (CommandRepository) o;
+
+    return m_repoVersionId == that.m_repoVersionId;
+  }
+
+  @Override
+  public int hashCode() {
+    return (int) (m_repoVersionId ^ (m_repoVersionId >>> 32));
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/80354430/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java
index 7e02590..d2d8898 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java
@@ -22,13 +22,14 @@ import java.util.TreeMap;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.RecoveryConfigHelper;
 import org.apache.ambari.server.agent.stomp.dto.HostLevelParamsCluster;
-import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.events.HostLevelParamsUpdateEvent;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Host;
 
 import com.google.inject.Inject;
+import com.google.inject.Provider;
 import com.google.inject.Singleton;
 
 @Singleton
@@ -38,25 +39,25 @@ public class HostLevelParamsHolder extends AgentHostDataHolder<HostLevelParamsUp
   private RecoveryConfigHelper recoveryConfigHelper;
 
   @Inject
-  private AmbariMetaInfo ambariMetaInfo;
+  private Clusters clusters;
 
   @Inject
-  private Clusters clusters;
+  private Provider<AmbariManagementController> m_ambariManagementController;
 
   @Override
   public HostLevelParamsUpdateEvent getCurrentData(Long hostId) throws AmbariException {
     TreeMap<String, HostLevelParamsCluster> hostLevelParamsClusters = new TreeMap<>();
     Host host = clusters.getHostById(hostId);
     for (Cluster cl : clusters.getClustersForHost(host.getHostName())) {
-      //TODO fix repo info host param
       HostLevelParamsCluster hostLevelParamsCluster = new HostLevelParamsCluster(
-          null,//ambariMetaInfo.getRepoInfo(cl, host),
+          m_ambariManagementController.get().retrieveHostRepositories(cl, host),
           recoveryConfigHelper.getRecoveryConfig(cl.getClusterName(), host.getHostName()));
 
       hostLevelParamsClusters.put(Long.toString(cl.getClusterId()),
           hostLevelParamsCluster);
     }
     HostLevelParamsUpdateEvent hostLevelParamsUpdateEvent = new HostLevelParamsUpdateEvent(hostLevelParamsClusters);
+    hostLevelParamsUpdateEvent.setHostId(hostId);
     return hostLevelParamsUpdateEvent;
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/80354430/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostLevelParamsCluster.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostLevelParamsCluster.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostLevelParamsCluster.java
index e4e28bf..5ca72e0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostLevelParamsCluster.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostLevelParamsCluster.java
@@ -17,30 +17,32 @@
  */
 package org.apache.ambari.server.agent.stomp.dto;
 
-import java.util.List;
 
 import org.apache.ambari.server.agent.RecoveryConfig;
-import org.apache.ambari.server.state.RepositoryInfo;
 
 import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
 
 @JsonInclude(JsonInclude.Include.NON_EMPTY)
 public class HostLevelParamsCluster {
 
-  private List<RepositoryInfo> repoInfo;
+  @JsonProperty("hostRepositories")
+  private HostRepositories hostRepositories;
+
+  @JsonProperty("recoveryConfig")
   private RecoveryConfig recoveryConfig;
 
-  public HostLevelParamsCluster(List<RepositoryInfo> repoInfo, RecoveryConfig recoveryConfig) {
-    this.repoInfo = repoInfo;
+  public HostLevelParamsCluster(HostRepositories hostRepositories, RecoveryConfig recoveryConfig) {
+    this.hostRepositories = hostRepositories;
     this.recoveryConfig = recoveryConfig;
   }
 
-  public List<RepositoryInfo> getRepoInfo() {
-    return repoInfo;
+  public HostRepositories getHostRepositories() {
+    return hostRepositories;
   }
 
-  public void setRepoInfo(List<RepositoryInfo> repoInfo) {
-    this.repoInfo = repoInfo;
+  public void setHostRepositories(HostRepositories hostRepositories) {
+    this.hostRepositories = hostRepositories;
   }
 
   public RecoveryConfig getRecoveryConfig() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/80354430/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostRepositories.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostRepositories.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostRepositories.java
new file mode 100644
index 0000000..8d0cfb0
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostRepositories.java
@@ -0,0 +1,39 @@
+package org.apache.ambari.server.agent.stomp.dto;
+
+import java.util.Map;
+
+import org.apache.ambari.server.agent.CommandRepository;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@JsonInclude(JsonInclude.Include.NON_EMPTY)
+public class HostRepositories {
+
+  @JsonProperty("commandRepos")
+  private Map<Long, CommandRepository> repositories;
+
+  @JsonProperty("componentRepos")
+  private Map<String, Long> componentRepos;
+
+  public HostRepositories(Map<Long, CommandRepository> repositories, Map<String, Long> componentRepos) {
+    this.repositories = repositories;
+    this.componentRepos = componentRepos;
+  }
+
+  public Map<Long, CommandRepository> getRepositories() {
+    return repositories;
+  }
+
+  public void setRepositories(Map<Long, CommandRepository> repositories) {
+    this.repositories = repositories;
+  }
+
+  public Map<String, Long> getComponentRepos() {
+    return componentRepos;
+  }
+
+  public void setComponentRepos(Map<String, Long> componentRepos) {
+    this.componentRepos = componentRepos;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/80354430/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
index 4b37d00..98128ef 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
@@ -27,6 +27,7 @@ import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.agent.stomp.dto.HostRepositories;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.api.services.LoggingService;
 import org.apache.ambari.server.controller.internal.DeleteStatusMetaData;
@@ -49,6 +50,7 @@ import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Config;
 import org.apache.ambari.server.state.ConfigHelper;
+import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.HostState;
 import org.apache.ambari.server.state.MaintenanceState;
 import org.apache.ambari.server.state.Service;
@@ -918,5 +920,6 @@ public interface AmbariManagementController {
 
   void saveConfigGroupUpdate(ConfigGroupRequest configGroupRequest, ConfigGroupResponse configGroupResponse);
 
+  HostRepositories retrieveHostRepositories(Cluster cluster, Host host) throws AmbariException;
 }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/80354430/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index 9db5832..6a7f050 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -102,10 +102,13 @@ import org.apache.ambari.server.actionmanager.HostRoleCommand;
 import org.apache.ambari.server.actionmanager.RequestFactory;
 import org.apache.ambari.server.actionmanager.Stage;
 import org.apache.ambari.server.actionmanager.StageFactory;
+import org.apache.ambari.server.agent.CommandRepository;
 import org.apache.ambari.server.agent.ExecutionCommand;
 import org.apache.ambari.server.agent.stomp.AgentConfigsHolder;
+import org.apache.ambari.server.agent.stomp.HostLevelParamsHolder;
 import org.apache.ambari.server.agent.stomp.MetadataHolder;
 import org.apache.ambari.server.agent.stomp.TopologyHolder;
+import org.apache.ambari.server.agent.stomp.dto.HostRepositories;
 import org.apache.ambari.server.agent.stomp.dto.MetadataCluster;
 import org.apache.ambari.server.agent.stomp.dto.MetadataServiceInfo;
 import org.apache.ambari.server.agent.stomp.dto.TopologyCluster;
@@ -364,6 +367,9 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
   private Provider<AgentConfigsHolder> m_agentConfigsHolder;
 
   @Inject
+  private Provider<HostLevelParamsHolder> m_hostLevelParamsHolder;
+
+  @Inject
   private ServiceComponentDesiredStateDAO serviceComponentDesiredStateDAO;
 
   /**
@@ -746,6 +752,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
   private TopologyUpdateEvent getAddedComponentsTopologyEvent(Set<ServiceComponentHostRequest> requests)
     throws AmbariException {
     TreeMap<String, TopologyCluster> topologyUpdates = new TreeMap<>();
+    Set<String> hostsToUpdate = new HashSet<>();
     for (ServiceComponentHostRequest request : requests) {
       String serviceName = request.getServiceName();
       String componentName = request.getComponentName();
@@ -754,6 +761,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
       Service s = cluster.getService(serviceName);
       ServiceComponent sc = s.getServiceComponent(componentName);
       String hostName = request.getHostname();
+      hostsToUpdate.add(hostName);
       Set<Long> hostIds = clusterHosts.stream()
           .filter(h -> hostName.equals(h.getHostName()))
           .map(h -> h.getHostId()).collect(Collectors.toSet());
@@ -790,6 +798,10 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
         topologyUpdates.get(clusterId).addTopologyComponent(newComponent);
       }
     }
+    for (String hostName : hostsToUpdate) {
+      Host host = clusters.getHost(hostName);
+      m_hostLevelParamsHolder.get().updateData(m_hostLevelParamsHolder.get().getCurrentData(host.getHostId()));
+    }
     return new TopologyUpdateEvent(topologyUpdates, TopologyUpdateEvent.EventType.UPDATE);
   }
 
@@ -6032,4 +6044,18 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
 
     return clusterLevelParams;
   }
+
+  @Override
+  public HostRepositories retrieveHostRepositories(Cluster cluster, Host host) throws AmbariException {
+    List<ServiceComponentHost> hostComponents = cluster.getServiceComponentHosts(host.getHostName());
+    Map<Long, CommandRepository> hostRepositories = new HashMap<>();
+    Map<String, Long> componentsRepos = new HashMap<>();
+    for (ServiceComponentHost serviceComponentHost : hostComponents) {
+      CommandRepository commandRepository = ambariMetaInfo.getCommandRepository(cluster,
+          serviceComponentHost.getServiceComponent(), host);
+      hostRepositories.put(commandRepository.getM_repoVersionId(), commandRepository);
+      componentsRepos.put(serviceComponentHost.getServiceComponentName(), commandRepository.getM_repoVersionId());
+    }
+    return new HostRepositories(hostRepositories, componentsRepos);
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/80354430/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java
index 1fd6697..5cc360f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java
@@ -535,10 +535,9 @@ public class HostResourceProvider extends AbstractControllerResourceProvider {
             addedHost.getHostName(),
             addedHost.getRackInfo(),
             addedHost.getIPv4()));
-        //TODO fix repo info param
         HostLevelParamsUpdateEvent hostLevelParamsUpdateEvent = new HostLevelParamsUpdateEvent(clusterId, new HostLevelParamsCluster(
-            null,//ambariMetaInfo.getRepoInfo(cl, addedHost),
-            recoveryConfigHelper.getRecoveryConfig(clusters.getCluster(hostRequest.getClusterName()).getClusterName(),
+            getManagementController().retrieveHostRepositories(cl, addedHost),
+            recoveryConfigHelper.getRecoveryConfig(cl.getClusterName(),
                 addedHost.getHostName())
         ));
         hostLevelParamsUpdateEvent.setHostId(addedHost.getHostId());

http://git-wip-us.apache.org/repos/asf/ambari/blob/80354430/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java
index 0d1fd5f..ffd2414 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java
@@ -1587,6 +1587,7 @@ public class ConfigHelper {
     }
 
     AgentConfigsUpdateEvent agentConfigsUpdateEvent = new AgentConfigsUpdateEvent(clustersConfigs);
+    agentConfigsUpdateEvent.setHostId(hostId);
     return agentConfigsUpdateEvent;
   }
 


[2/3] ambari git commit: AMBARI-22262. Server should fire a host level params update event when repository version is resolved. (mpapirkovskyy)

Posted by mp...@apache.org.
AMBARI-22262. Server should fire a host level params update event when repository version is resolved. (mpapirkovskyy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/62319963
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/62319963
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/62319963

Branch: refs/heads/branch-3.0-perf
Commit: 62319963b02d2cc88326ae28944b48a5869040aa
Parents: 8035443
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Tue Oct 17 17:18:25 2017 +0300
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Wed Oct 18 16:39:58 2017 +0300

----------------------------------------------------------------------
 .../ambari/server/api/services/AmbariMetaInfo.java    |  1 +
 .../listeners/upgrade/StackVersionListener.java       | 12 +++++++++++-
 .../listeners/upgrade/StackVersionListenerTest.java   | 14 +++++++-------
 3 files changed, 19 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/62319963/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
index 2a794de..407238d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
@@ -1527,6 +1527,7 @@ public class AmbariMetaInfo {
         command.setRepositoryVersionId(rve.getId());
         command.setRepositoryVersion(rve.getVersion());
         command.setStackName(rve.getStackName());
+        command.setResolved(rve.isResolved());
 
         // !!! a repository version entity has all the repos worked out.  We shouldn't use
         // the stack at all.

http://git-wip-us.apache.org/repos/asf/ambari/blob/62319963/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
index 4329cdb..7cd8360 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
@@ -19,6 +19,7 @@ package org.apache.ambari.server.events.listeners.upgrade;
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.agent.stomp.HostLevelParamsHolder;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.events.HostComponentVersionAdvertisedEvent;
 import org.apache.ambari.server.events.publishers.VersionEventPublisher;
@@ -65,6 +66,9 @@ public class StackVersionListener {
   @Inject
   private Provider<AmbariMetaInfo> ambariMetaInfoProvider;
 
+  @Inject
+  private Provider<HostLevelParamsHolder> m_hostLevelParamsHolder;
+
   /**
    * Constructor.
    *
@@ -76,7 +80,7 @@ public class StackVersionListener {
   }
 
   @Subscribe
-  public void onAmbariEvent(HostComponentVersionAdvertisedEvent event) {
+  public void onAmbariEvent(HostComponentVersionAdvertisedEvent event) throws AmbariException {
     LOG.debug("Received event {}", event);
 
     Cluster cluster = event.getCluster();
@@ -96,19 +100,25 @@ public class StackVersionListener {
       // exact version is not known in advance.
       RepositoryVersionEntity rve = repositoryVersionDAO.findByPK(event.getRepositoryVersionId());
       if (null != rve) {
+        boolean updated = false;
         String currentRepoVersion = rve.getVersion();
         if (!StringUtils.equals(currentRepoVersion, newVersion)) {
           rve.setVersion(newVersion);
           rve.setResolved(true);
           repositoryVersionDAO.merge(rve);
+          updated = true;
         } else {
           // the reported versions are the same - we should ensure that the repo
           // is resolved
           if (!rve.isResolved()) {
             rve.setResolved(true);
             repositoryVersionDAO.merge(rve);
+            updated = true;
           }
         }
+        if (updated) {
+          m_hostLevelParamsHolder.get().updateData(m_hostLevelParamsHolder.get().getCurrentData(sch.getHost().getHostId()));
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/62319963/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListenerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListenerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListenerTest.java
index ffacab9..1ed8d38 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListenerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListenerTest.java
@@ -202,7 +202,7 @@ public class StackVersionListenerTest extends EasyMockSupport {
 
 
   @Test
-  public void testNoActionTakenOnNullVersion() {
+  public void testNoActionTakenOnNullVersion() throws AmbariException {
     expect(componentInfo.isVersionAdvertised()).andReturn(true).once();
     resetAll();
     replayAll();
@@ -211,7 +211,7 @@ public class StackVersionListenerTest extends EasyMockSupport {
   }
 
   @Test
-  public void testSetUpgradeStateToCompleteWhenUpgradeIsInProgressAndNewVersionIsEqualToComponentDesiredVersion() {
+  public void testSetUpgradeStateToCompleteWhenUpgradeIsInProgressAndNewVersionIsEqualToComponentDesiredVersion() throws AmbariException {
     expect(cluster.getUpgradeInProgress()).andReturn(DUMMY_UPGRADE_ENTITY);
 
     expect(sch.getVersion()).andReturn(VALID_PREVIOUS_VERSION);
@@ -227,7 +227,7 @@ public class StackVersionListenerTest extends EasyMockSupport {
   }
 
   @Test
-  public void testSetUpgradeStateToNoneWhenNoUpgradeAndNewVersionIsEqualToComponentDesiredVersion() {
+  public void testSetUpgradeStateToNoneWhenNoUpgradeAndNewVersionIsEqualToComponentDesiredVersion() throws AmbariException {
     expect(sch.getVersion()).andReturn(VALID_PREVIOUS_VERSION);
     expect(sch.getUpgradeState()).andReturn(UpgradeState.IN_PROGRESS);
     expect(componentInfo.isVersionAdvertised()).andReturn(true).once();
@@ -258,7 +258,7 @@ public class StackVersionListenerTest extends EasyMockSupport {
   }
 
   @Test
-  public void testSetUpgradeStateToCompleteWhenHostHasVersionMismatchAndNewVersionIsEqualToComponentDesiredVersionAndClusterUpgradeIsInProgress() {
+  public void testSetUpgradeStateToCompleteWhenHostHasVersionMismatchAndNewVersionIsEqualToComponentDesiredVersionAndClusterUpgradeIsInProgress() throws AmbariException {
     expect(sch.getVersion()).andReturn(VALID_PREVIOUS_VERSION);
     expect(sch.getUpgradeState()).andReturn(UpgradeState.VERSION_MISMATCH);
     expect(cluster.getUpgradeInProgress()).andReturn(DUMMY_UPGRADE_ENTITY);
@@ -273,7 +273,7 @@ public class StackVersionListenerTest extends EasyMockSupport {
   }
 
   @Test
-  public void testSetUpgradeStateToNoneWhenHostHasVersionMismatchAndNewVersionIsEqualToComponentDesiredVersion() {
+  public void testSetUpgradeStateToNoneWhenHostHasVersionMismatchAndNewVersionIsEqualToComponentDesiredVersion() throws AmbariException {
     expect(sch.getVersion()).andReturn(VALID_PREVIOUS_VERSION);
     expect(sch.getUpgradeState()).andReturn(UpgradeState.VERSION_MISMATCH);
     expect(serviceComponent.getDesiredVersion()).andStubReturn(VALID_NEW_VERSION);
@@ -287,7 +287,7 @@ public class StackVersionListenerTest extends EasyMockSupport {
   }
 
   @Test
-  public void testSetUpgradeStateToVersionMismatchByDefaultWhenHostAndNewVersionsAreValid() {
+  public void testSetUpgradeStateToVersionMismatchByDefaultWhenHostAndNewVersionsAreValid() throws AmbariException {
     expect(sch.getVersion()).andReturn(VALID_PREVIOUS_VERSION);
     expect(componentInfo.isVersionAdvertised()).andReturn(true).once();
     sch.setUpgradeState(UpgradeState.VERSION_MISMATCH);
@@ -406,7 +406,7 @@ public class StackVersionListenerTest extends EasyMockSupport {
     verifyAll();
   }
 
-  private void sendEventAndVerify(String newVersion) {
+  private void sendEventAndVerify(String newVersion) throws AmbariException {
     HostComponentVersionAdvertisedEvent event = new HostComponentVersionAdvertisedEvent(cluster, sch, newVersion);
     listener.onAmbariEvent(event);
 


[3/3] ambari git commit: AMBARI-22264. Sometimes request and host component status updates are lost. (mpapirkovskyy)

Posted by mp...@apache.org.
AMBARI-22264. Sometimes request and host component status updates are lost. (mpapirkovskyy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f3e98bf7
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f3e98bf7
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f3e98bf7

Branch: refs/heads/branch-3.0-perf
Commit: f3e98bf7d3e807af6bb26e43c17322d9928fbe18
Parents: 6231996
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Tue Oct 17 17:46:06 2017 +0300
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Wed Oct 18 17:48:56 2017 +0300

----------------------------------------------------------------------
 .../agent/stomp/AmbariSubscriptionRegistry.java     |  8 +++++++-
 .../publishers/BufferedUpdateEventPublisher.java    | 16 ++++++++--------
 .../HostComponentUpdateEventPublisher.java          | 13 +++++++++----
 .../publishers/ServiceUpdateEventPublisher.java     |  6 ++++--
 .../publishers/StateUpdateEventPublisher.java       |  5 ++---
 5 files changed, 30 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/f3e98bf7/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
index aaab7bf..68330c6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
@@ -245,6 +245,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
     private final Map<String, LinkedMultiValueMap<String, String>> accessCache =
         new ConcurrentHashMap<>(cacheLimit);
 
+    //TODO optimize usage of this cache on perf cluster
     private final Cache<String, String> notSubscriptionCache =
         CacheBuilder.newBuilder().maximumSize(cacheLimit).build();
 
@@ -275,7 +276,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
     }
 
     public void updateAfterNewSubscription(String destination, String sessionId, String subsId) {
-      this.accessCache.computeIfPresent(destination, (key, value) -> {
+      LinkedMultiValueMap<String, String> updatedMap = this.accessCache.computeIfPresent(destination, (key, value) -> {
         if (getPathMatcher().match(destination, key)) {
           LinkedMultiValueMap<String, String> subs = value.deepCopy();
           subs.add(sessionId, subsId);
@@ -283,6 +284,9 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
         }
         return value;
       });
+      if (updatedMap == null) {
+        this.notSubscriptionCache.invalidate(destination);
+      }
     }
 
     public void updateAfterRemovedSubscription(String sessionId, String subsId) {
@@ -301,6 +305,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
             iterator.remove();
           }
           else {
+            this.notSubscriptionCache.invalidate(destination);
             this.accessCache.put(destination, sessionMap.deepCopy());
           }
         }
@@ -318,6 +323,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
             iterator.remove();
           }
           else {
+            this.notSubscriptionCache.invalidate(destination);
             this.accessCache.put(destination, sessionMap.deepCopy());
           }
         }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f3e98bf7/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
index 75549c3..e02785f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
@@ -19,7 +19,6 @@
 package org.apache.ambari.server.events.publishers;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executors;
@@ -34,21 +33,21 @@ import com.google.inject.Singleton;
 @Singleton
 public abstract class BufferedUpdateEventPublisher<T> {
 
-  private static final long TIMEOUT = 1L;
+  private static final long TIMEOUT = 1000L;
   private final AtomicLong previousTime = new AtomicLong(0);
   private final AtomicBoolean collecting = new AtomicBoolean(false);
   private final ConcurrentLinkedQueue<T> buffer = new ConcurrentLinkedQueue<>();
   private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
 
-  public void publish(Collection<T> event, EventBus m_eventBus) {
+  public void publish(T event, EventBus m_eventBus) {
     long eventTime = System.currentTimeMillis();
-    if (eventTime - previousTime.get() <= TIMEOUT && !collecting.get()) {
-      buffer.addAll(event);
+    if ((eventTime - previousTime.get() <= TIMEOUT) && !collecting.get()) {
+      buffer.add(event);
       collecting.set(true);
-      scheduledExecutorService.schedule(getScheduledPublished(m_eventBus),
+      scheduledExecutorService.schedule(getScheduledPublisher(m_eventBus),
           TIMEOUT, TimeUnit.MILLISECONDS);
     } else if (collecting.get()) {
-      buffer.addAll(event);
+      buffer.add(event);
     } else {
       //TODO add logging and metrics posting
       previousTime.set(eventTime);
@@ -56,9 +55,10 @@ public abstract class BufferedUpdateEventPublisher<T> {
     }
   }
 
-  protected abstract Runnable getScheduledPublished(EventBus m_eventBus);
+  protected abstract Runnable getScheduledPublisher(EventBus m_eventBus);
 
   protected List<T> retrieveBuffer() {
+    resetCollecting();
     List<T> bufferContent = new ArrayList<>();
     while (!buffer.isEmpty()) {
       bufferContent.add(buffer.poll());

http://git-wip-us.apache.org/repos/asf/ambari/blob/f3e98bf7/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
index a8c1b1d..f7fea1d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
@@ -19,6 +19,7 @@
 package org.apache.ambari.server.events.publishers;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 import org.apache.ambari.server.events.HostComponentUpdate;
 import org.apache.ambari.server.events.HostComponentsUpdateEvent;
@@ -27,10 +28,10 @@ import com.google.common.eventbus.EventBus;
 import com.google.inject.Singleton;
 
 @Singleton
-public class HostComponentUpdateEventPublisher extends BufferedUpdateEventPublisher<HostComponentUpdate> {
+public class HostComponentUpdateEventPublisher extends BufferedUpdateEventPublisher<HostComponentsUpdateEvent> {
 
   @Override
-  protected Runnable getScheduledPublished(EventBus m_eventBus) {
+  protected Runnable getScheduledPublisher(EventBus m_eventBus) {
     return new HostComponentsEventRunnable(m_eventBus);
   }
 
@@ -44,12 +45,16 @@ public class HostComponentUpdateEventPublisher extends BufferedUpdateEventPublis
 
     @Override
     public void run() {
-      List<HostComponentUpdate> hostComponentUpdates = retrieveBuffer();
+      List<HostComponentsUpdateEvent> hostComponentUpdateEvents = retrieveBuffer();
+      if (hostComponentUpdateEvents.isEmpty()) {
+        return;
+      }
+      List<HostComponentUpdate> hostComponentUpdates = hostComponentUpdateEvents.stream().flatMap(
+          u -> u.getHostComponentUpdates().stream()).collect(Collectors.toList());
 
       HostComponentsUpdateEvent resultEvents = new HostComponentsUpdateEvent(hostComponentUpdates);
       //TODO add logging and metrics posting
       eventBus.post(resultEvents);
-      resetCollecting();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f3e98bf7/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
index 7bf1290..8f45859 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
@@ -30,7 +30,7 @@ import com.google.inject.Singleton;
 public class ServiceUpdateEventPublisher extends BufferedUpdateEventPublisher<ServiceUpdateEvent> {
 
   @Override
-  protected Runnable getScheduledPublished(EventBus m_eventBus) {
+  protected Runnable getScheduledPublisher(EventBus m_eventBus) {
     return new ServiceEventRunnable(m_eventBus);
   }
 
@@ -45,6 +45,9 @@ public class ServiceUpdateEventPublisher extends BufferedUpdateEventPublisher<Se
     @Override
     public void run() {
       List<ServiceUpdateEvent> serviceUpdates = retrieveBuffer();
+      if (serviceUpdates.isEmpty()) {
+        return;
+      }
       List<ServiceUpdateEvent> filtered = new ArrayList<>();
       for (ServiceUpdateEvent event : serviceUpdates) {
         int pos = filtered.indexOf(event);
@@ -62,7 +65,6 @@ public class ServiceUpdateEventPublisher extends BufferedUpdateEventPublisher<Se
       for (ServiceUpdateEvent serviceUpdateEvent : serviceUpdates) {
         eventBus.post(serviceUpdateEvent);
       }
-      resetCollecting();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f3e98bf7/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java
index 7d343a5..80c9813 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java
@@ -17,7 +17,6 @@
  */
 package org.apache.ambari.server.events.publishers;
 
-import java.util.Collections;
 import java.util.concurrent.Executors;
 
 import org.apache.ambari.server.events.AmbariUpdateEvent;
@@ -53,9 +52,9 @@ public class StateUpdateEventPublisher {
     if (event.getType().equals(AmbariUpdateEvent.Type.REQUEST)) {
       requestUpdateEventPublisher.publish((RequestUpdateEvent) event, m_eventBus);
     } else if (event.getType().equals(AmbariUpdateEvent.Type.HOSTCOMPONENT)) {
-      hostComponentUpdateEventPublisher.publish(((HostComponentsUpdateEvent) event).getHostComponentUpdates(), m_eventBus);
+      hostComponentUpdateEventPublisher.publish((HostComponentsUpdateEvent) event, m_eventBus);
     } else if (event.getType().equals(AmbariUpdateEvent.Type.SERVICE)) {
-      serviceUpdateEventPublisher.publish(Collections.singletonList((ServiceUpdateEvent) event), m_eventBus);
+      serviceUpdateEventPublisher.publish((ServiceUpdateEvent) event, m_eventBus);
     } else {
       m_eventBus.post(event);
     }