You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2017/10/18 16:01:22 UTC
[1/3] ambari git commit: AMBARI-22261. Update format of server-agent
/agents/host_level_params response according to changed repo_info mapping.
(mpapirkovskyy)
Repository: ambari
Updated Branches:
refs/heads/branch-3.0-perf dddce4dd3 -> f3e98bf7d
AMBARI-22261. Update format of server-agent /agents/host_level_params response according to changed repo_info mapping. (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/80354430
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/80354430
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/80354430
Branch: refs/heads/branch-3.0-perf
Commit: 80354430213e6a55e2a3aa88aec4593f3fe83691
Parents: dddce4d
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Thu Oct 12 18:04:24 2017 +0300
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Wed Oct 18 13:56:26 2017 +0300
----------------------------------------------------------------------
.../ambari/server/agent/CommandRepository.java | 29 +++++++++++++++
.../agent/stomp/HostLevelParamsHolder.java | 11 +++---
.../agent/stomp/dto/HostLevelParamsCluster.java | 20 +++++-----
.../agent/stomp/dto/HostRepositories.java | 39 ++++++++++++++++++++
.../controller/AmbariManagementController.java | 3 ++
.../AmbariManagementControllerImpl.java | 26 +++++++++++++
.../internal/HostResourceProvider.java | 5 +--
.../ambari/server/state/ConfigHelper.java | 1 +
8 files changed, 117 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/80354430/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandRepository.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandRepository.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandRepository.java
index 301f475..aea8ff5 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandRepository.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandRepository.java
@@ -25,6 +25,7 @@ import org.apache.ambari.server.orm.entities.RepositoryEntity;
import org.apache.ambari.server.state.RepositoryInfo;
import org.apache.commons.lang.builder.ToStringBuilder;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.gson.annotations.SerializedName;
/**
@@ -34,15 +35,19 @@ import com.google.gson.annotations.SerializedName;
public class CommandRepository {
@SerializedName("repositories")
+ @JsonProperty("repositories")
private List<Repository> m_repositories = new ArrayList<>();
@SerializedName("repoVersion")
+ @JsonProperty("repoVersion")
private String m_repoVersion;
@SerializedName("repoVersionId")
+ @JsonProperty("repoVersionId")
private long m_repoVersionId;
@SerializedName("stackName")
+ @JsonProperty("stackName")
private String m_stackName;
/**
@@ -125,6 +130,10 @@ public class CommandRepository {
}
}
+ public long getM_repoVersionId() {
+ return m_repoVersionId;
+ }
+
/**
* Gets whether this repository has been marked as having its version
* resolved.
@@ -153,16 +162,20 @@ public class CommandRepository {
public static class Repository {
@SerializedName("baseUrl")
+ @JsonProperty("baseUrl")
private String m_baseUrl;
@SerializedName("repoId")
+ @JsonProperty("repoId")
private String m_repoId;
@SerializedName("ambariManaged")
+ @JsonProperty("ambariManaged")
private boolean m_ambariManaged = true;
@SerializedName("repoName")
+ @JsonProperty("repoName")
private final String m_repoName;
@SerializedName("distribution")
@@ -172,6 +185,7 @@ public class CommandRepository {
private final String m_components;
@SerializedName("mirrorsList")
+ @JsonProperty("mirrorsList")
private String m_mirrorsList;
private transient String m_osType;
@@ -244,4 +258,19 @@ public class CommandRepository {
}
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ CommandRepository that = (CommandRepository) o;
+
+ return m_repoVersionId == that.m_repoVersionId;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) (m_repoVersionId ^ (m_repoVersionId >>> 32));
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/80354430/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java
index 7e02590..d2d8898 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolder.java
@@ -22,13 +22,14 @@ import java.util.TreeMap;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.agent.RecoveryConfigHelper;
import org.apache.ambari.server.agent.stomp.dto.HostLevelParamsCluster;
-import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.controller.AmbariManagementController;
import org.apache.ambari.server.events.HostLevelParamsUpdateEvent;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.Host;
import com.google.inject.Inject;
+import com.google.inject.Provider;
import com.google.inject.Singleton;
@Singleton
@@ -38,25 +39,25 @@ public class HostLevelParamsHolder extends AgentHostDataHolder<HostLevelParamsUp
private RecoveryConfigHelper recoveryConfigHelper;
@Inject
- private AmbariMetaInfo ambariMetaInfo;
+ private Clusters clusters;
@Inject
- private Clusters clusters;
+ private Provider<AmbariManagementController> m_ambariManagementController;
@Override
public HostLevelParamsUpdateEvent getCurrentData(Long hostId) throws AmbariException {
TreeMap<String, HostLevelParamsCluster> hostLevelParamsClusters = new TreeMap<>();
Host host = clusters.getHostById(hostId);
for (Cluster cl : clusters.getClustersForHost(host.getHostName())) {
- //TODO fix repo info host param
HostLevelParamsCluster hostLevelParamsCluster = new HostLevelParamsCluster(
- null,//ambariMetaInfo.getRepoInfo(cl, host),
+ m_ambariManagementController.get().retrieveHostRepositories(cl, host),
recoveryConfigHelper.getRecoveryConfig(cl.getClusterName(), host.getHostName()));
hostLevelParamsClusters.put(Long.toString(cl.getClusterId()),
hostLevelParamsCluster);
}
HostLevelParamsUpdateEvent hostLevelParamsUpdateEvent = new HostLevelParamsUpdateEvent(hostLevelParamsClusters);
+ hostLevelParamsUpdateEvent.setHostId(hostId);
return hostLevelParamsUpdateEvent;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/80354430/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostLevelParamsCluster.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostLevelParamsCluster.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostLevelParamsCluster.java
index e4e28bf..5ca72e0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostLevelParamsCluster.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostLevelParamsCluster.java
@@ -17,30 +17,32 @@
*/
package org.apache.ambari.server.agent.stomp.dto;
-import java.util.List;
import org.apache.ambari.server.agent.RecoveryConfig;
-import org.apache.ambari.server.state.RepositoryInfo;
import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public class HostLevelParamsCluster {
- private List<RepositoryInfo> repoInfo;
+ @JsonProperty("hostRepositories")
+ private HostRepositories hostRepositories;
+
+ @JsonProperty("recoveryConfig")
private RecoveryConfig recoveryConfig;
- public HostLevelParamsCluster(List<RepositoryInfo> repoInfo, RecoveryConfig recoveryConfig) {
- this.repoInfo = repoInfo;
+ public HostLevelParamsCluster(HostRepositories hostRepositories, RecoveryConfig recoveryConfig) {
+ this.hostRepositories = hostRepositories;
this.recoveryConfig = recoveryConfig;
}
- public List<RepositoryInfo> getRepoInfo() {
- return repoInfo;
+ public HostRepositories getHostRepositories() {
+ return hostRepositories;
}
- public void setRepoInfo(List<RepositoryInfo> repoInfo) {
- this.repoInfo = repoInfo;
+ public void setHostRepositories(HostRepositories hostRepositories) {
+ this.hostRepositories = hostRepositories;
}
public RecoveryConfig getRecoveryConfig() {
http://git-wip-us.apache.org/repos/asf/ambari/blob/80354430/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostRepositories.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostRepositories.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostRepositories.java
new file mode 100644
index 0000000..8d0cfb0
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/HostRepositories.java
@@ -0,0 +1,39 @@
+package org.apache.ambari.server.agent.stomp.dto;
+
+import java.util.Map;
+
+import org.apache.ambari.server.agent.CommandRepository;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@JsonInclude(JsonInclude.Include.NON_EMPTY)
+public class HostRepositories {
+
+ @JsonProperty("commandRepos")
+ private Map<Long, CommandRepository> repositories;
+
+ @JsonProperty("componentRepos")
+ private Map<String, Long> componentRepos;
+
+ public HostRepositories(Map<Long, CommandRepository> repositories, Map<String, Long> componentRepos) {
+ this.repositories = repositories;
+ this.componentRepos = componentRepos;
+ }
+
+ public Map<Long, CommandRepository> getRepositories() {
+ return repositories;
+ }
+
+ public void setRepositories(Map<Long, CommandRepository> repositories) {
+ this.repositories = repositories;
+ }
+
+ public Map<String, Long> getComponentRepos() {
+ return componentRepos;
+ }
+
+ public void setComponentRepos(Map<String, Long> componentRepos) {
+ this.componentRepos = componentRepos;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/80354430/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
index 4b37d00..98128ef 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
@@ -27,6 +27,7 @@ import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.actionmanager.ActionManager;
import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.agent.stomp.dto.HostRepositories;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.api.services.LoggingService;
import org.apache.ambari.server.controller.internal.DeleteStatusMetaData;
@@ -49,6 +50,7 @@ import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.Config;
import org.apache.ambari.server.state.ConfigHelper;
+import org.apache.ambari.server.state.Host;
import org.apache.ambari.server.state.HostState;
import org.apache.ambari.server.state.MaintenanceState;
import org.apache.ambari.server.state.Service;
@@ -918,5 +920,6 @@ public interface AmbariManagementController {
void saveConfigGroupUpdate(ConfigGroupRequest configGroupRequest, ConfigGroupResponse configGroupResponse);
+ HostRepositories retrieveHostRepositories(Cluster cluster, Host host) throws AmbariException;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/80354430/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index 9db5832..6a7f050 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -102,10 +102,13 @@ import org.apache.ambari.server.actionmanager.HostRoleCommand;
import org.apache.ambari.server.actionmanager.RequestFactory;
import org.apache.ambari.server.actionmanager.Stage;
import org.apache.ambari.server.actionmanager.StageFactory;
+import org.apache.ambari.server.agent.CommandRepository;
import org.apache.ambari.server.agent.ExecutionCommand;
import org.apache.ambari.server.agent.stomp.AgentConfigsHolder;
+import org.apache.ambari.server.agent.stomp.HostLevelParamsHolder;
import org.apache.ambari.server.agent.stomp.MetadataHolder;
import org.apache.ambari.server.agent.stomp.TopologyHolder;
+import org.apache.ambari.server.agent.stomp.dto.HostRepositories;
import org.apache.ambari.server.agent.stomp.dto.MetadataCluster;
import org.apache.ambari.server.agent.stomp.dto.MetadataServiceInfo;
import org.apache.ambari.server.agent.stomp.dto.TopologyCluster;
@@ -364,6 +367,9 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
private Provider<AgentConfigsHolder> m_agentConfigsHolder;
@Inject
+ private Provider<HostLevelParamsHolder> m_hostLevelParamsHolder;
+
+ @Inject
private ServiceComponentDesiredStateDAO serviceComponentDesiredStateDAO;
/**
@@ -746,6 +752,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
private TopologyUpdateEvent getAddedComponentsTopologyEvent(Set<ServiceComponentHostRequest> requests)
throws AmbariException {
TreeMap<String, TopologyCluster> topologyUpdates = new TreeMap<>();
+ Set<String> hostsToUpdate = new HashSet<>();
for (ServiceComponentHostRequest request : requests) {
String serviceName = request.getServiceName();
String componentName = request.getComponentName();
@@ -754,6 +761,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
Service s = cluster.getService(serviceName);
ServiceComponent sc = s.getServiceComponent(componentName);
String hostName = request.getHostname();
+ hostsToUpdate.add(hostName);
Set<Long> hostIds = clusterHosts.stream()
.filter(h -> hostName.equals(h.getHostName()))
.map(h -> h.getHostId()).collect(Collectors.toSet());
@@ -790,6 +798,10 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
topologyUpdates.get(clusterId).addTopologyComponent(newComponent);
}
}
+ for (String hostName : hostsToUpdate) {
+ Host host = clusters.getHost(hostName);
+ m_hostLevelParamsHolder.get().updateData(m_hostLevelParamsHolder.get().getCurrentData(host.getHostId()));
+ }
return new TopologyUpdateEvent(topologyUpdates, TopologyUpdateEvent.EventType.UPDATE);
}
@@ -6032,4 +6044,18 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
return clusterLevelParams;
}
+
+ @Override
+ public HostRepositories retrieveHostRepositories(Cluster cluster, Host host) throws AmbariException {
+ List<ServiceComponentHost> hostComponents = cluster.getServiceComponentHosts(host.getHostName());
+ Map<Long, CommandRepository> hostRepositories = new HashMap<>();
+ Map<String, Long> componentsRepos = new HashMap<>();
+ for (ServiceComponentHost serviceComponentHost : hostComponents) {
+ CommandRepository commandRepository = ambariMetaInfo.getCommandRepository(cluster,
+ serviceComponentHost.getServiceComponent(), host);
+ hostRepositories.put(commandRepository.getM_repoVersionId(), commandRepository);
+ componentsRepos.put(serviceComponentHost.getServiceComponentName(), commandRepository.getM_repoVersionId());
+ }
+ return new HostRepositories(hostRepositories, componentsRepos);
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/80354430/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java
index 1fd6697..5cc360f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java
@@ -535,10 +535,9 @@ public class HostResourceProvider extends AbstractControllerResourceProvider {
addedHost.getHostName(),
addedHost.getRackInfo(),
addedHost.getIPv4()));
- //TODO fix repo info param
HostLevelParamsUpdateEvent hostLevelParamsUpdateEvent = new HostLevelParamsUpdateEvent(clusterId, new HostLevelParamsCluster(
- null,//ambariMetaInfo.getRepoInfo(cl, addedHost),
- recoveryConfigHelper.getRecoveryConfig(clusters.getCluster(hostRequest.getClusterName()).getClusterName(),
+ getManagementController().retrieveHostRepositories(cl, addedHost),
+ recoveryConfigHelper.getRecoveryConfig(cl.getClusterName(),
addedHost.getHostName())
));
hostLevelParamsUpdateEvent.setHostId(addedHost.getHostId());
http://git-wip-us.apache.org/repos/asf/ambari/blob/80354430/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java
index 0d1fd5f..ffd2414 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java
@@ -1587,6 +1587,7 @@ public class ConfigHelper {
}
AgentConfigsUpdateEvent agentConfigsUpdateEvent = new AgentConfigsUpdateEvent(clustersConfigs);
+ agentConfigsUpdateEvent.setHostId(hostId);
return agentConfigsUpdateEvent;
}
[2/3] ambari git commit: AMBARI-22262. Server should fire a host
level params update event when repository version is resolved.
(mpapirkovskyy)
Posted by mp...@apache.org.
AMBARI-22262. Server should fire a host level params update event when repository version is resolved. (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/62319963
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/62319963
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/62319963
Branch: refs/heads/branch-3.0-perf
Commit: 62319963b02d2cc88326ae28944b48a5869040aa
Parents: 8035443
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Tue Oct 17 17:18:25 2017 +0300
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Wed Oct 18 16:39:58 2017 +0300
----------------------------------------------------------------------
.../ambari/server/api/services/AmbariMetaInfo.java | 1 +
.../listeners/upgrade/StackVersionListener.java | 12 +++++++++++-
.../listeners/upgrade/StackVersionListenerTest.java | 14 +++++++-------
3 files changed, 19 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/62319963/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
index 2a794de..407238d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
@@ -1527,6 +1527,7 @@ public class AmbariMetaInfo {
command.setRepositoryVersionId(rve.getId());
command.setRepositoryVersion(rve.getVersion());
command.setStackName(rve.getStackName());
+ command.setResolved(rve.isResolved());
// !!! a repository version entity has all the repos worked out. We shouldn't use
// the stack at all.
http://git-wip-us.apache.org/repos/asf/ambari/blob/62319963/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
index 4329cdb..7cd8360 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
@@ -19,6 +19,7 @@ package org.apache.ambari.server.events.listeners.upgrade;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.agent.stomp.HostLevelParamsHolder;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.events.HostComponentVersionAdvertisedEvent;
import org.apache.ambari.server.events.publishers.VersionEventPublisher;
@@ -65,6 +66,9 @@ public class StackVersionListener {
@Inject
private Provider<AmbariMetaInfo> ambariMetaInfoProvider;
+ @Inject
+ private Provider<HostLevelParamsHolder> m_hostLevelParamsHolder;
+
/**
* Constructor.
*
@@ -76,7 +80,7 @@ public class StackVersionListener {
}
@Subscribe
- public void onAmbariEvent(HostComponentVersionAdvertisedEvent event) {
+ public void onAmbariEvent(HostComponentVersionAdvertisedEvent event) throws AmbariException {
LOG.debug("Received event {}", event);
Cluster cluster = event.getCluster();
@@ -96,19 +100,25 @@ public class StackVersionListener {
// exact version is not known in advance.
RepositoryVersionEntity rve = repositoryVersionDAO.findByPK(event.getRepositoryVersionId());
if (null != rve) {
+ boolean updated = false;
String currentRepoVersion = rve.getVersion();
if (!StringUtils.equals(currentRepoVersion, newVersion)) {
rve.setVersion(newVersion);
rve.setResolved(true);
repositoryVersionDAO.merge(rve);
+ updated = true;
} else {
// the reported versions are the same - we should ensure that the repo
// is resolved
if (!rve.isResolved()) {
rve.setResolved(true);
repositoryVersionDAO.merge(rve);
+ updated = true;
}
}
+ if (updated) {
+ m_hostLevelParamsHolder.get().updateData(m_hostLevelParamsHolder.get().getCurrentData(sch.getHost().getHostId()));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/62319963/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListenerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListenerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListenerTest.java
index ffacab9..1ed8d38 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListenerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListenerTest.java
@@ -202,7 +202,7 @@ public class StackVersionListenerTest extends EasyMockSupport {
@Test
- public void testNoActionTakenOnNullVersion() {
+ public void testNoActionTakenOnNullVersion() throws AmbariException {
expect(componentInfo.isVersionAdvertised()).andReturn(true).once();
resetAll();
replayAll();
@@ -211,7 +211,7 @@ public class StackVersionListenerTest extends EasyMockSupport {
}
@Test
- public void testSetUpgradeStateToCompleteWhenUpgradeIsInProgressAndNewVersionIsEqualToComponentDesiredVersion() {
+ public void testSetUpgradeStateToCompleteWhenUpgradeIsInProgressAndNewVersionIsEqualToComponentDesiredVersion() throws AmbariException {
expect(cluster.getUpgradeInProgress()).andReturn(DUMMY_UPGRADE_ENTITY);
expect(sch.getVersion()).andReturn(VALID_PREVIOUS_VERSION);
@@ -227,7 +227,7 @@ public class StackVersionListenerTest extends EasyMockSupport {
}
@Test
- public void testSetUpgradeStateToNoneWhenNoUpgradeAndNewVersionIsEqualToComponentDesiredVersion() {
+ public void testSetUpgradeStateToNoneWhenNoUpgradeAndNewVersionIsEqualToComponentDesiredVersion() throws AmbariException {
expect(sch.getVersion()).andReturn(VALID_PREVIOUS_VERSION);
expect(sch.getUpgradeState()).andReturn(UpgradeState.IN_PROGRESS);
expect(componentInfo.isVersionAdvertised()).andReturn(true).once();
@@ -258,7 +258,7 @@ public class StackVersionListenerTest extends EasyMockSupport {
}
@Test
- public void testSetUpgradeStateToCompleteWhenHostHasVersionMismatchAndNewVersionIsEqualToComponentDesiredVersionAndClusterUpgradeIsInProgress() {
+ public void testSetUpgradeStateToCompleteWhenHostHasVersionMismatchAndNewVersionIsEqualToComponentDesiredVersionAndClusterUpgradeIsInProgress() throws AmbariException {
expect(sch.getVersion()).andReturn(VALID_PREVIOUS_VERSION);
expect(sch.getUpgradeState()).andReturn(UpgradeState.VERSION_MISMATCH);
expect(cluster.getUpgradeInProgress()).andReturn(DUMMY_UPGRADE_ENTITY);
@@ -273,7 +273,7 @@ public class StackVersionListenerTest extends EasyMockSupport {
}
@Test
- public void testSetUpgradeStateToNoneWhenHostHasVersionMismatchAndNewVersionIsEqualToComponentDesiredVersion() {
+ public void testSetUpgradeStateToNoneWhenHostHasVersionMismatchAndNewVersionIsEqualToComponentDesiredVersion() throws AmbariException {
expect(sch.getVersion()).andReturn(VALID_PREVIOUS_VERSION);
expect(sch.getUpgradeState()).andReturn(UpgradeState.VERSION_MISMATCH);
expect(serviceComponent.getDesiredVersion()).andStubReturn(VALID_NEW_VERSION);
@@ -287,7 +287,7 @@ public class StackVersionListenerTest extends EasyMockSupport {
}
@Test
- public void testSetUpgradeStateToVersionMismatchByDefaultWhenHostAndNewVersionsAreValid() {
+ public void testSetUpgradeStateToVersionMismatchByDefaultWhenHostAndNewVersionsAreValid() throws AmbariException {
expect(sch.getVersion()).andReturn(VALID_PREVIOUS_VERSION);
expect(componentInfo.isVersionAdvertised()).andReturn(true).once();
sch.setUpgradeState(UpgradeState.VERSION_MISMATCH);
@@ -406,7 +406,7 @@ public class StackVersionListenerTest extends EasyMockSupport {
verifyAll();
}
- private void sendEventAndVerify(String newVersion) {
+ private void sendEventAndVerify(String newVersion) throws AmbariException {
HostComponentVersionAdvertisedEvent event = new HostComponentVersionAdvertisedEvent(cluster, sch, newVersion);
listener.onAmbariEvent(event);
[3/3] ambari git commit: AMBARI-22264. Sometimes request and host
component status updates are lost. (mpapirkovskyy)
Posted by mp...@apache.org.
AMBARI-22264. Sometimes request and host component status updates are lost. (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f3e98bf7
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f3e98bf7
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f3e98bf7
Branch: refs/heads/branch-3.0-perf
Commit: f3e98bf7d3e807af6bb26e43c17322d9928fbe18
Parents: 6231996
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Tue Oct 17 17:46:06 2017 +0300
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Wed Oct 18 17:48:56 2017 +0300
----------------------------------------------------------------------
.../agent/stomp/AmbariSubscriptionRegistry.java | 8 +++++++-
.../publishers/BufferedUpdateEventPublisher.java | 16 ++++++++--------
.../HostComponentUpdateEventPublisher.java | 13 +++++++++----
.../publishers/ServiceUpdateEventPublisher.java | 6 ++++--
.../publishers/StateUpdateEventPublisher.java | 5 ++---
5 files changed, 30 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/f3e98bf7/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
index aaab7bf..68330c6 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
@@ -245,6 +245,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
private final Map<String, LinkedMultiValueMap<String, String>> accessCache =
new ConcurrentHashMap<>(cacheLimit);
+ //TODO optimize usage of this cache on perf cluster
private final Cache<String, String> notSubscriptionCache =
CacheBuilder.newBuilder().maximumSize(cacheLimit).build();
@@ -275,7 +276,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
}
public void updateAfterNewSubscription(String destination, String sessionId, String subsId) {
- this.accessCache.computeIfPresent(destination, (key, value) -> {
+ LinkedMultiValueMap<String, String> updatedMap = this.accessCache.computeIfPresent(destination, (key, value) -> {
if (getPathMatcher().match(destination, key)) {
LinkedMultiValueMap<String, String> subs = value.deepCopy();
subs.add(sessionId, subsId);
@@ -283,6 +284,9 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
}
return value;
});
+ if (updatedMap == null) {
+ this.notSubscriptionCache.invalidate(destination);
+ }
}
public void updateAfterRemovedSubscription(String sessionId, String subsId) {
@@ -301,6 +305,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
iterator.remove();
}
else {
+ this.notSubscriptionCache.invalidate(destination);
this.accessCache.put(destination, sessionMap.deepCopy());
}
}
@@ -318,6 +323,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
iterator.remove();
}
else {
+ this.notSubscriptionCache.invalidate(destination);
this.accessCache.put(destination, sessionMap.deepCopy());
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/f3e98bf7/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
index 75549c3..e02785f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/BufferedUpdateEventPublisher.java
@@ -19,7 +19,6 @@
package org.apache.ambari.server.events.publishers;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
@@ -34,21 +33,21 @@ import com.google.inject.Singleton;
@Singleton
public abstract class BufferedUpdateEventPublisher<T> {
- private static final long TIMEOUT = 1L;
+ private static final long TIMEOUT = 1000L;
private final AtomicLong previousTime = new AtomicLong(0);
private final AtomicBoolean collecting = new AtomicBoolean(false);
private final ConcurrentLinkedQueue<T> buffer = new ConcurrentLinkedQueue<>();
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
- public void publish(Collection<T> event, EventBus m_eventBus) {
+ public void publish(T event, EventBus m_eventBus) {
long eventTime = System.currentTimeMillis();
- if (eventTime - previousTime.get() <= TIMEOUT && !collecting.get()) {
- buffer.addAll(event);
+ if ((eventTime - previousTime.get() <= TIMEOUT) && !collecting.get()) {
+ buffer.add(event);
collecting.set(true);
- scheduledExecutorService.schedule(getScheduledPublished(m_eventBus),
+ scheduledExecutorService.schedule(getScheduledPublisher(m_eventBus),
TIMEOUT, TimeUnit.MILLISECONDS);
} else if (collecting.get()) {
- buffer.addAll(event);
+ buffer.add(event);
} else {
//TODO add logging and metrics posting
previousTime.set(eventTime);
@@ -56,9 +55,10 @@ public abstract class BufferedUpdateEventPublisher<T> {
}
}
- protected abstract Runnable getScheduledPublished(EventBus m_eventBus);
+ protected abstract Runnable getScheduledPublisher(EventBus m_eventBus);
protected List<T> retrieveBuffer() {
+ resetCollecting();
List<T> bufferContent = new ArrayList<>();
while (!buffer.isEmpty()) {
bufferContent.add(buffer.poll());
http://git-wip-us.apache.org/repos/asf/ambari/blob/f3e98bf7/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
index a8c1b1d..f7fea1d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
@@ -19,6 +19,7 @@
package org.apache.ambari.server.events.publishers;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.ambari.server.events.HostComponentUpdate;
import org.apache.ambari.server.events.HostComponentsUpdateEvent;
@@ -27,10 +28,10 @@ import com.google.common.eventbus.EventBus;
import com.google.inject.Singleton;
@Singleton
-public class HostComponentUpdateEventPublisher extends BufferedUpdateEventPublisher<HostComponentUpdate> {
+public class HostComponentUpdateEventPublisher extends BufferedUpdateEventPublisher<HostComponentsUpdateEvent> {
@Override
- protected Runnable getScheduledPublished(EventBus m_eventBus) {
+ protected Runnable getScheduledPublisher(EventBus m_eventBus) {
return new HostComponentsEventRunnable(m_eventBus);
}
@@ -44,12 +45,16 @@ public class HostComponentUpdateEventPublisher extends BufferedUpdateEventPublis
@Override
public void run() {
- List<HostComponentUpdate> hostComponentUpdates = retrieveBuffer();
+ List<HostComponentsUpdateEvent> hostComponentUpdateEvents = retrieveBuffer();
+ if (hostComponentUpdateEvents.isEmpty()) {
+ return;
+ }
+ List<HostComponentUpdate> hostComponentUpdates = hostComponentUpdateEvents.stream().flatMap(
+ u -> u.getHostComponentUpdates().stream()).collect(Collectors.toList());
HostComponentsUpdateEvent resultEvents = new HostComponentsUpdateEvent(hostComponentUpdates);
//TODO add logging and metrics posting
eventBus.post(resultEvents);
- resetCollecting();
}
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/f3e98bf7/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
index 7bf1290..8f45859 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/ServiceUpdateEventPublisher.java
@@ -30,7 +30,7 @@ import com.google.inject.Singleton;
public class ServiceUpdateEventPublisher extends BufferedUpdateEventPublisher<ServiceUpdateEvent> {
@Override
- protected Runnable getScheduledPublished(EventBus m_eventBus) {
+ protected Runnable getScheduledPublisher(EventBus m_eventBus) {
return new ServiceEventRunnable(m_eventBus);
}
@@ -45,6 +45,9 @@ public class ServiceUpdateEventPublisher extends BufferedUpdateEventPublisher<Se
@Override
public void run() {
List<ServiceUpdateEvent> serviceUpdates = retrieveBuffer();
+ if (serviceUpdates.isEmpty()) {
+ return;
+ }
List<ServiceUpdateEvent> filtered = new ArrayList<>();
for (ServiceUpdateEvent event : serviceUpdates) {
int pos = filtered.indexOf(event);
@@ -62,7 +65,6 @@ public class ServiceUpdateEventPublisher extends BufferedUpdateEventPublisher<Se
for (ServiceUpdateEvent serviceUpdateEvent : serviceUpdates) {
eventBus.post(serviceUpdateEvent);
}
- resetCollecting();
}
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/f3e98bf7/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java
index 7d343a5..80c9813 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java
@@ -17,7 +17,6 @@
*/
package org.apache.ambari.server.events.publishers;
-import java.util.Collections;
import java.util.concurrent.Executors;
import org.apache.ambari.server.events.AmbariUpdateEvent;
@@ -53,9 +52,9 @@ public class StateUpdateEventPublisher {
if (event.getType().equals(AmbariUpdateEvent.Type.REQUEST)) {
requestUpdateEventPublisher.publish((RequestUpdateEvent) event, m_eventBus);
} else if (event.getType().equals(AmbariUpdateEvent.Type.HOSTCOMPONENT)) {
- hostComponentUpdateEventPublisher.publish(((HostComponentsUpdateEvent) event).getHostComponentUpdates(), m_eventBus);
+ hostComponentUpdateEventPublisher.publish((HostComponentsUpdateEvent) event, m_eventBus);
} else if (event.getType().equals(AmbariUpdateEvent.Type.SERVICE)) {
- serviceUpdateEventPublisher.publish(Collections.singletonList((ServiceUpdateEvent) event), m_eventBus);
+ serviceUpdateEventPublisher.publish((ServiceUpdateEvent) event, m_eventBus);
} else {
m_eventBus.post(event);
}