You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2016/10/19 19:06:10 UTC
[6/8] ambari git commit: Merge branch 'branch-feature-AMBARI-18456'
into trunk
http://git-wip-us.apache.org/repos/asf/ambari/blob/f64fa722/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index 2f7d6b9..84697b8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -30,8 +30,9 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -173,34 +174,32 @@ public class ClusterImpl implements Cluster {
private StackId desiredStackVersion;
- private volatile boolean desiredStackVersionSet = true;
-
- private volatile Map<String, Service> services = null;
+ private final ConcurrentSkipListMap<String, Service> services = new ConcurrentSkipListMap<>();
/**
* [ Config Type -> [ Config Version Tag -> Config ] ]
*/
- private volatile Map<String, Map<String, Config>> allConfigs;
+ private final ConcurrentMap<String, ConcurrentMap<String, Config>> allConfigs = new ConcurrentHashMap<>();
/**
* [ ServiceName -> [ ServiceComponentName -> [ HostName -> [ ... ] ] ] ]
*/
- private Map<String, Map<String, Map<String, ServiceComponentHost>>> serviceComponentHosts;
+ private final ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, ServiceComponentHost>>> serviceComponentHosts = new ConcurrentHashMap<>();
/**
* [ HostName -> [ ... ] ]
*/
- private Map<String, List<ServiceComponentHost>> serviceComponentHostsByHost;
+ private final ConcurrentMap<String, List<ServiceComponentHost>> serviceComponentHostsByHost = new ConcurrentHashMap<>();
/**
* Map of existing config groups
*/
- private volatile Map<Long, ConfigGroup> clusterConfigGroups;
+ private final Map<Long, ConfigGroup> clusterConfigGroups = new ConcurrentHashMap<>();
/**
* Map of Request schedules for this cluster
*/
- private volatile Map<Long, RequestExecution> requestExecutions;
+ private final Map<Long, RequestExecution> requestExecutions = new ConcurrentHashMap<>();
private final ReadWriteLock clusterGlobalLock = new ReentrantReadWriteLock();
@@ -287,8 +286,6 @@ public class ClusterImpl implements Cluster {
@Inject
private StackDAO stackDAO;
- private volatile boolean svcHostsLoaded = false;
-
private volatile Multimap<String, String> serviceConfigTypes;
/**
@@ -311,20 +308,21 @@ public class ClusterImpl implements Cluster {
private Map<String, String> m_clusterPropertyCache = new ConcurrentHashMap<>();
@Inject
- public ClusterImpl(@Assisted ClusterEntity clusterEntity,
- Injector injector, AmbariEventPublisher eventPublisher) throws AmbariException {
- injector.injectMembers(this);
+ public ClusterImpl(@Assisted ClusterEntity clusterEntity, Injector injector,
+ AmbariEventPublisher eventPublisher)
+ throws AmbariException {
clusterId = clusterEntity.getClusterId();
clusterName = clusterEntity.getClusterName();
- serviceComponentHosts = new HashMap<>();
-
- serviceComponentHostsByHost = new HashMap<>();
-
- desiredStackVersion = new StackId(clusterEntity.getDesiredStack());
+ injector.injectMembers(this);
+ loadStackVersion();
+ loadServices();
+ loadServiceHostComponents();
+ loadConfigGroups();
cacheConfigurations();
+ loadRequestExecutions();
if (desiredStackVersion != null && !StringUtils.isEmpty(desiredStackVersion.getStackName()) && !
StringUtils.isEmpty(desiredStackVersion.getStackVersion())) {
@@ -336,12 +334,6 @@ public class ClusterImpl implements Cluster {
this.eventPublisher = eventPublisher;
}
-
- @Override
- public ReadWriteLock getClusterGlobalLock() {
- return clusterGlobalLock;
- }
-
private void loadServiceConfigTypes() throws AmbariException {
try {
serviceConfigTypes = collectServiceConfigTypesMapping();
@@ -382,301 +374,196 @@ public class ClusterImpl implements Cluster {
* Make sure we load all the service host components.
* We need this for live status checks.
*/
- public void loadServiceHostComponents() {
- loadServices();
- if (svcHostsLoaded) {
- return;
- }
-
- clusterGlobalLock.writeLock().lock();
-
- try {
- LOG.info("Loading Service Host Components");
- if (svcHostsLoaded) {
- return;
- }
- if (services != null) {
- for (Entry<String, Service> serviceKV : services.entrySet()) {
- /* get all the service component hosts **/
- Service service = serviceKV.getValue();
- if (!serviceComponentHosts.containsKey(service.getName())) {
- serviceComponentHosts.put(service.getName(),
- new HashMap<String, Map<String, ServiceComponentHost>>());
+ private void loadServiceHostComponents() {
+ for (Entry<String, Service> serviceKV : services.entrySet()) {
+ /* get all the service component hosts **/
+ Service service = serviceKV.getValue();
+ if (!serviceComponentHosts.containsKey(service.getName())) {
+ serviceComponentHosts.put(service.getName(),
+ new ConcurrentHashMap<String, ConcurrentMap<String, ServiceComponentHost>>());
+ }
+
+ for (Entry<String, ServiceComponent> svcComponent : service.getServiceComponents().entrySet()) {
+ ServiceComponent comp = svcComponent.getValue();
+ String componentName = svcComponent.getKey();
+ if (!serviceComponentHosts.get(service.getName()).containsKey(componentName)) {
+ serviceComponentHosts.get(service.getName()).put(componentName,
+ new ConcurrentHashMap<String, ServiceComponentHost>());
+ }
+
+ /** Get Service Host Components **/
+ for (Entry<String, ServiceComponentHost> svchost : comp.getServiceComponentHosts().entrySet()) {
+ String hostname = svchost.getKey();
+ ServiceComponentHost svcHostComponent = svchost.getValue();
+ if (!serviceComponentHostsByHost.containsKey(hostname)) {
+ serviceComponentHostsByHost.put(hostname,
+ new CopyOnWriteArrayList<ServiceComponentHost>());
}
- for (Entry<String, ServiceComponent> svcComponent : service.getServiceComponents().entrySet()) {
- ServiceComponent comp = svcComponent.getValue();
- String componentName = svcComponent.getKey();
- if (!serviceComponentHosts.get(service.getName()).containsKey(
- componentName)) {
- serviceComponentHosts.get(service.getName()).put(componentName,
- new HashMap<String, ServiceComponentHost>());
- }
- /** Get Service Host Components **/
- for (Entry<String, ServiceComponentHost> svchost : comp.getServiceComponentHosts().entrySet()) {
- String hostname = svchost.getKey();
- ServiceComponentHost svcHostComponent = svchost.getValue();
- if (!serviceComponentHostsByHost.containsKey(hostname)) {
- serviceComponentHostsByHost.put(hostname,
- new ArrayList<ServiceComponentHost>());
- }
- List<ServiceComponentHost> compList = serviceComponentHostsByHost.get(hostname);
- compList.add(svcHostComponent);
- if (!serviceComponentHosts.get(service.getName()).get(
- componentName).containsKey(hostname)) {
- serviceComponentHosts.get(service.getName()).get(componentName).put(
- hostname, svcHostComponent);
- }
- }
+ List<ServiceComponentHost> compList = serviceComponentHostsByHost.get(hostname);
+ compList.add(svcHostComponent);
+
+ if (!serviceComponentHosts.get(service.getName()).get(componentName).containsKey(
+ hostname)) {
+ serviceComponentHosts.get(service.getName()).get(componentName).put(hostname,
+ svcHostComponent);
}
}
}
- svcHostsLoaded = true;
- } finally {
- clusterGlobalLock.writeLock().unlock();
}
}
private void loadServices() {
- if (services == null) {
- clusterGlobalLock.writeLock().lock();
+ ClusterEntity clusterEntity = getClusterEntity();
+ if (clusterEntity.getClusterServiceEntities().isEmpty()) {
+ return;
+ }
+ for (ClusterServiceEntity serviceEntity : clusterEntity.getClusterServiceEntities()) {
+ StackId stackId = getCurrentStackVersion();
try {
- if (services == null) {
- ClusterEntity clusterEntity = getClusterEntity();
- if (clusterEntity != null) {
- services = new TreeMap<String, Service>();
-
- if (!clusterEntity.getClusterServiceEntities().isEmpty()) {
- for (ClusterServiceEntity serviceEntity : clusterEntity.getClusterServiceEntities()) {
- StackId stackId = getCurrentStackVersion();
- try {
- if (ambariMetaInfo.getService(stackId.getStackName(),
- stackId.getStackVersion(), serviceEntity.getServiceName()) != null) {
- services.put(serviceEntity.getServiceName(),
- serviceFactory.createExisting(this, serviceEntity));
- }
- } catch (AmbariException e) {
- LOG.error(String.format(
- "Can not get service info: stackName=%s, stackVersion=%s, serviceName=%s",
- stackId.getStackName(), stackId.getStackVersion(),
- serviceEntity.getServiceName()));
- }
- }
- }
- }
+ if (ambariMetaInfo.getService(stackId.getStackName(),
+ stackId.getStackVersion(), serviceEntity.getServiceName()) != null) {
+ services.put(serviceEntity.getServiceName(),
+ serviceFactory.createExisting(this, serviceEntity));
}
- } finally {
- clusterGlobalLock.writeLock().unlock();
+ } catch (AmbariException e) {
+ LOG.error(String.format(
+ "Can not get service info: stackName=%s, stackVersion=%s, serviceName=%s",
+ stackId.getStackName(), stackId.getStackVersion(),
+ serviceEntity.getServiceName()));
}
}
}
private void loadConfigGroups() {
- if (clusterConfigGroups == null) {
- clusterGlobalLock.writeLock().lock();
-
- try {
- if (clusterConfigGroups == null) {
- ClusterEntity clusterEntity = getClusterEntity();
- if (clusterEntity != null) {
- clusterConfigGroups = new HashMap<Long, ConfigGroup>();
- if (!clusterEntity.getConfigGroupEntities().isEmpty()) {
- for (ConfigGroupEntity configGroupEntity : clusterEntity.getConfigGroupEntities()) {
- clusterConfigGroups.put(configGroupEntity.getGroupId(),
- configGroupFactory.createExisting(this, configGroupEntity));
- }
- }
- }
- }
- } finally {
- clusterGlobalLock.writeLock().unlock();
+ ClusterEntity clusterEntity = getClusterEntity();
+ if (!clusterEntity.getConfigGroupEntities().isEmpty()) {
+ for (ConfigGroupEntity configGroupEntity : clusterEntity.getConfigGroupEntities()) {
+ clusterConfigGroups.put(configGroupEntity.getGroupId(),
+ configGroupFactory.createExisting(this, configGroupEntity));
}
}
}
private void loadRequestExecutions() {
- if (requestExecutions == null) {
- clusterGlobalLock.writeLock().lock();
- try {
- if (requestExecutions == null) {
- ClusterEntity clusterEntity = getClusterEntity();
- if (clusterEntity != null) {
- requestExecutions = new HashMap<Long, RequestExecution>();
- if (!clusterEntity.getRequestScheduleEntities().isEmpty()) {
- for (RequestScheduleEntity scheduleEntity : clusterEntity.getRequestScheduleEntities()) {
- requestExecutions.put(scheduleEntity.getScheduleId(),
- requestExecutionFactory.createExisting(this, scheduleEntity));
- }
- }
- }
- }
- } finally {
- clusterGlobalLock.writeLock().unlock();
+ ClusterEntity clusterEntity = getClusterEntity();
+ if (!clusterEntity.getRequestScheduleEntities().isEmpty()) {
+ for (RequestScheduleEntity scheduleEntity : clusterEntity.getRequestScheduleEntities()) {
+ requestExecutions.put(scheduleEntity.getScheduleId(),
+ requestExecutionFactory.createExisting(this, scheduleEntity));
}
}
}
@Override
public void addConfigGroup(ConfigGroup configGroup) throws AmbariException {
- loadConfigGroups();
- clusterGlobalLock.writeLock().lock();
- try {
- String hostList = "";
- if(LOG.isDebugEnabled()) {
- if (configGroup.getHosts() != null) {
- for (Host host : configGroup.getHosts().values()) {
- hostList += host.getHostName() + ", ";
- }
+ String hostList = "";
+ if(LOG.isDebugEnabled()) {
+ if (configGroup.getHosts() != null) {
+ for (Host host : configGroup.getHosts().values()) {
+ hostList += host.getHostName() + ", ";
}
}
+ }
- LOG.debug("Adding a new Config group" + ", clusterName = "
- + getClusterName() + ", groupName = " + configGroup.getName()
- + ", tag = " + configGroup.getTag() + " with hosts " + hostList);
-
- if (clusterConfigGroups.containsKey(configGroup.getId())) {
- // The loadConfigGroups will load all groups to memory
- LOG.debug("Config group already exists" + ", clusterName = "
- + getClusterName() + ", groupName = " + configGroup.getName()
- + ", groupId = " + configGroup.getId() + ", tag = "
- + configGroup.getTag());
- } else {
- clusterConfigGroups.put(configGroup.getId(), configGroup);
- }
+ LOG.debug("Adding a new Config group" + ", clusterName = "
+ + getClusterName() + ", groupName = " + configGroup.getName()
+ + ", tag = " + configGroup.getTag() + " with hosts " + hostList);
- } finally {
- clusterGlobalLock.writeLock().unlock();
+ if (clusterConfigGroups.containsKey(configGroup.getId())) {
+ // The loadConfigGroups will load all groups to memory
+ LOG.debug("Config group already exists" + ", clusterName = "
+ + getClusterName() + ", groupName = " + configGroup.getName()
+ + ", groupId = " + configGroup.getId() + ", tag = "
+ + configGroup.getTag());
+ } else {
+ clusterConfigGroups.put(configGroup.getId(), configGroup);
}
}
@Override
public Map<Long, ConfigGroup> getConfigGroups() {
- loadConfigGroups();
- clusterGlobalLock.readLock().lock();
- try {
- return Collections.unmodifiableMap(clusterConfigGroups);
- } finally {
- clusterGlobalLock.readLock().unlock();
- }
+ return Collections.unmodifiableMap(clusterConfigGroups);
}
@Override
public Map<Long, ConfigGroup> getConfigGroupsByHostname(String hostname)
throws AmbariException {
- loadConfigGroups();
Map<Long, ConfigGroup> configGroups = new HashMap<Long, ConfigGroup>();
- clusterGlobalLock.readLock().lock();
- try {
- for (Entry<Long, ConfigGroup> groupEntry : clusterConfigGroups.entrySet()) {
- Long id = groupEntry.getKey();
- ConfigGroup group = groupEntry.getValue();
- for (Host host : group.getHosts().values()) {
- if (StringUtils.equals(hostname, host.getHostName())) {
- configGroups.put(id, group);
- break;
- }
+ for (Entry<Long, ConfigGroup> groupEntry : clusterConfigGroups.entrySet()) {
+ Long id = groupEntry.getKey();
+ ConfigGroup group = groupEntry.getValue();
+ for (Host host : group.getHosts().values()) {
+ if (StringUtils.equals(hostname, host.getHostName())) {
+ configGroups.put(id, group);
+ break;
}
}
- } finally {
- clusterGlobalLock.readLock().unlock();
}
return configGroups;
}
@Override
public void addRequestExecution(RequestExecution requestExecution) throws AmbariException {
- loadRequestExecutions();
- clusterGlobalLock.writeLock().lock();
- try {
- LOG.info("Adding a new request schedule" + ", clusterName = "
- + getClusterName() + ", id = " + requestExecution.getId()
- + ", description = " + requestExecution.getDescription());
-
- if (requestExecutions.containsKey(requestExecution.getId())) {
- LOG.debug("Request schedule already exists" + ", clusterName = "
- + getClusterName() + ", id = " + requestExecution.getId()
- + ", description = " + requestExecution.getDescription());
- } else {
- requestExecutions.put(requestExecution.getId(), requestExecution);
- }
- } finally {
- clusterGlobalLock.writeLock().unlock();
+ LOG.info("Adding a new request schedule" + ", clusterName = " + getClusterName() + ", id = "
+ + requestExecution.getId() + ", description = " + requestExecution.getDescription());
+
+ if (requestExecutions.containsKey(requestExecution.getId())) {
+ LOG.debug(
+ "Request schedule already exists" + ", clusterName = " + getClusterName() + ", id = "
+ + requestExecution.getId() + ", description = " + requestExecution.getDescription());
+ } else {
+ requestExecutions.put(requestExecution.getId(), requestExecution);
}
}
@Override
public Map<Long, RequestExecution> getAllRequestExecutions() {
- loadRequestExecutions();
- clusterGlobalLock.readLock().lock();
- try {
- return Collections.unmodifiableMap(requestExecutions);
- } finally {
- clusterGlobalLock.readLock().unlock();
- }
+ return Collections.unmodifiableMap(requestExecutions);
}
@Override
public void deleteRequestExecution(Long id) throws AmbariException {
- loadRequestExecutions();
- clusterGlobalLock.writeLock().lock();
- try {
- RequestExecution requestExecution = requestExecutions.get(id);
- if (requestExecution == null) {
- throw new AmbariException("Request schedule does not exists, "
- + "id = " + id);
- }
- LOG.info("Deleting request schedule" + ", clusterName = "
- + getClusterName() + ", id = " + requestExecution.getId()
- + ", description = " + requestExecution.getDescription());
-
- requestExecution.delete();
- requestExecutions.remove(id);
- } finally {
- clusterGlobalLock.writeLock().unlock();
+ RequestExecution requestExecution = requestExecutions.get(id);
+ if (requestExecution == null) {
+ throw new AmbariException("Request schedule does not exists, " + "id = " + id);
}
+ LOG.info("Deleting request schedule" + ", clusterName = " + getClusterName() + ", id = "
+ + requestExecution.getId() + ", description = " + requestExecution.getDescription());
+
+ requestExecution.delete();
+ requestExecutions.remove(id);
}
@Override
public void deleteConfigGroup(Long id) throws AmbariException, AuthorizationException {
- loadConfigGroups();
- clusterGlobalLock.writeLock().lock();
- try {
- ConfigGroup configGroup = clusterConfigGroups.get(id);
- if (configGroup == null) {
- throw new ConfigGroupNotFoundException(getClusterName(), id.toString());
- }
+ ConfigGroup configGroup = clusterConfigGroups.get(id);
+ if (configGroup == null) {
+ throw new ConfigGroupNotFoundException(getClusterName(), id.toString());
+ }
- LOG.debug("Deleting Config group" + ", clusterName = " + getClusterName()
- + ", groupName = " + configGroup.getName() + ", groupId = "
- + configGroup.getId() + ", tag = " + configGroup.getTag());
+ LOG.debug("Deleting Config group" + ", clusterName = " + getClusterName()
+ + ", groupName = " + configGroup.getName() + ", groupId = "
+ + configGroup.getId() + ", tag = " + configGroup.getTag());
- configGroup.delete();
- clusterConfigGroups.remove(id);
- } finally {
- clusterGlobalLock.writeLock().unlock();
- }
+ configGroup.delete();
+ clusterConfigGroups.remove(id);
}
public ServiceComponentHost getServiceComponentHost(String serviceName,
String serviceComponentName, String hostname) throws AmbariException {
- loadServiceHostComponents();
- clusterGlobalLock.readLock().lock();
- try {
- if (!serviceComponentHosts.containsKey(serviceName)
- || !serviceComponentHosts.get(serviceName).containsKey(
- serviceComponentName)
- || !serviceComponentHosts.get(serviceName).get(serviceComponentName).containsKey(
- hostname)) {
- throw new ServiceComponentHostNotFoundException(getClusterName(),
- serviceName, serviceComponentName, hostname);
- }
- return serviceComponentHosts.get(serviceName).get(serviceComponentName).get(
- hostname);
- } finally {
- clusterGlobalLock.readLock().unlock();
+ if (!serviceComponentHosts.containsKey(serviceName)
+ || !serviceComponentHosts.get(serviceName).containsKey(
+ serviceComponentName)
+ || !serviceComponentHosts.get(serviceName).get(serviceComponentName).containsKey(
+ hostname)) {
+ throw new ServiceComponentHostNotFoundException(getClusterName(),
+ serviceName, serviceComponentName, hostname);
}
+ return serviceComponentHosts.get(serviceName).get(serviceComponentName).get(
+ hostname);
}
@Override
@@ -687,21 +574,14 @@ public class ClusterImpl implements Cluster {
@Override
public void setClusterName(String clusterName) {
String oldName = null;
- clusterGlobalLock.writeLock().lock();
- try {
- ClusterEntity clusterEntity = getClusterEntity();
- if (clusterEntity != null) {
- oldName = clusterEntity.getClusterName();
- clusterEntity.setClusterName(clusterName);
+ ClusterEntity clusterEntity = getClusterEntity();
+ oldName = clusterEntity.getClusterName();
+ clusterEntity.setClusterName(clusterName);
- // RollbackException possibility if UNIQUE constraint violated
- clusterEntity = clusterDAO.merge(clusterEntity);
- clusters.updateClusterName(oldName, clusterName);
- this.clusterName = clusterName;
- }
- } finally {
- clusterGlobalLock.writeLock().unlock();
- }
+ // RollbackException possibility if UNIQUE constraint violated
+ clusterEntity = clusterDAO.merge(clusterEntity);
+ clusters.updateClusterName(oldName, clusterName);
+ this.clusterName = clusterName;
// if the name changed, fire an event
if (!StringUtils.equals(oldName, clusterName)) {
@@ -713,38 +593,25 @@ public class ClusterImpl implements Cluster {
@Override
public Long getResourceId() {
ClusterEntity clusterEntity = getClusterEntity();
- if (clusterEntity != null) {
- ResourceEntity resourceEntity = clusterEntity.getResource();
- if (resourceEntity == null) {
- LOG.warn("There is no resource associated with this cluster:\n\tCluster Name: {}\n\tCluster ID: {}",
+
+ ResourceEntity resourceEntity = clusterEntity.getResource();
+ if (resourceEntity == null) {
+ LOG.warn(
+ "There is no resource associated with this cluster:\n\tCluster Name: {}\n\tCluster ID: {}",
getClusterName(), getClusterId());
- return null;
- } else {
- return resourceEntity.getId();
- }
+ return null;
+ } else {
+ return resourceEntity.getId();
}
- return null;
}
@Override
- public void addServiceComponentHosts(Collection<ServiceComponentHost> serviceComponentHosts) throws AmbariException {
- clusterGlobalLock.writeLock().lock();
- try {
- for (ServiceComponentHost serviceComponentHost : serviceComponentHosts) {
- Service service = getService(serviceComponentHost.getServiceName());
- ServiceComponent serviceComponent = service.getServiceComponent(serviceComponentHost.getServiceComponentName());
- serviceComponent.addServiceComponentHost(serviceComponentHost);
- }
- persistServiceComponentHosts(serviceComponentHosts);
- } finally {
- clusterGlobalLock.writeLock().unlock();
- }
- }
-
@Transactional
- void persistServiceComponentHosts(Collection<ServiceComponentHost> serviceComponentHosts) {
+ public void addServiceComponentHosts(Collection<ServiceComponentHost> serviceComponentHosts) throws AmbariException {
for (ServiceComponentHost serviceComponentHost : serviceComponentHosts) {
- serviceComponentHost.persist();
+ Service service = getService(serviceComponentHost.getServiceName());
+ ServiceComponent serviceComponent = service.getServiceComponent(serviceComponentHost.getServiceComponentName());
+ serviceComponent.addServiceComponentHost(serviceComponentHost);
}
}
@@ -756,68 +623,61 @@ public class ClusterImpl implements Cluster {
svcCompHost.getHostName());
}
- loadServiceHostComponents();
-
final String hostname = svcCompHost.getHostName();
final String serviceName = svcCompHost.getServiceName();
final String componentName = svcCompHost.getServiceComponentName();
Set<Cluster> cs = clusters.getClustersForHost(hostname);
- clusterGlobalLock.writeLock().lock();
-
- try {
- boolean clusterFound = false;
- Iterator<Cluster> iter = cs.iterator();
- while (iter.hasNext()) {
- Cluster c = iter.next();
- if (c.getClusterId() == getClusterId()) {
- clusterFound = true;
- break;
- }
- }
-
- if (!clusterFound) {
- throw new AmbariException("Host does not belong this cluster"
- + ", hostname=" + hostname + ", clusterName=" + getClusterName()
- + ", clusterId=" + getClusterId());
+ boolean clusterFound = false;
+ Iterator<Cluster> iter = cs.iterator();
+ while (iter.hasNext()) {
+ Cluster c = iter.next();
+ if (c.getClusterId() == getClusterId()) {
+ clusterFound = true;
+ break;
}
+ }
- if (!serviceComponentHosts.containsKey(serviceName)) {
- serviceComponentHosts.put(serviceName,
- new HashMap<String, Map<String, ServiceComponentHost>>());
- }
+ if (!clusterFound) {
+ throw new AmbariException("Host does not belong this cluster"
+ + ", hostname=" + hostname + ", clusterName=" + getClusterName()
+ + ", clusterId=" + getClusterId());
+ }
- if (!serviceComponentHosts.get(serviceName).containsKey(componentName)) {
- serviceComponentHosts.get(serviceName).put(componentName,
- new HashMap<String, ServiceComponentHost>());
- }
+ if (!serviceComponentHosts.containsKey(serviceName)) {
+ serviceComponentHosts.put(serviceName,
+ new ConcurrentHashMap<String, ConcurrentMap<String, ServiceComponentHost>>());
+ }
- if (serviceComponentHosts.get(serviceName).get(componentName).containsKey(
- hostname)) {
- throw new AmbariException("Duplicate entry for ServiceComponentHost"
- + ", serviceName=" + serviceName + ", serviceComponentName"
- + componentName + ", hostname= " + hostname);
- }
+ if (!serviceComponentHosts.get(serviceName).containsKey(componentName)) {
+ serviceComponentHosts.get(serviceName).put(componentName,
+ new ConcurrentHashMap<String, ServiceComponentHost>());
+ }
- if (!serviceComponentHostsByHost.containsKey(hostname)) {
- serviceComponentHostsByHost.put(hostname,
- new ArrayList<ServiceComponentHost>());
- }
+ if (serviceComponentHosts.get(serviceName).get(componentName).containsKey(
+ hostname)) {
+ throw new AmbariException("Duplicate entry for ServiceComponentHost"
+ + ", serviceName=" + serviceName + ", serviceComponentName"
+ + componentName + ", hostname= " + hostname);
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding a new ServiceComponentHost" + ", clusterName="
- + getClusterName() + ", clusterId=" + getClusterId()
- + ", serviceName=" + serviceName + ", serviceComponentName"
- + componentName + ", hostname= " + hostname);
- }
+ if (!serviceComponentHostsByHost.containsKey(hostname)) {
+ serviceComponentHostsByHost.put(hostname,
+ new CopyOnWriteArrayList<ServiceComponentHost>());
+ }
- serviceComponentHosts.get(serviceName).get(componentName).put(hostname,
- svcCompHost);
- serviceComponentHostsByHost.get(hostname).add(svcCompHost);
- } finally {
- clusterGlobalLock.writeLock().unlock();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding a new ServiceComponentHost" + ", clusterName="
+ + getClusterName() + ", clusterId=" + getClusterId()
+ + ", serviceName=" + serviceName + ", serviceComponentName"
+ + componentName + ", hostname= " + hostname);
}
+
+ serviceComponentHosts.get(serviceName).get(componentName).put(hostname,
+ svcCompHost);
+
+ serviceComponentHostsByHost.get(hostname).add(svcCompHost);
}
@Override
@@ -830,76 +690,69 @@ public class ClusterImpl implements Cluster {
svcCompHost.getHostName());
}
- loadServiceHostComponents();
-
final String hostname = svcCompHost.getHostName();
final String serviceName = svcCompHost.getServiceName();
final String componentName = svcCompHost.getServiceComponentName();
Set<Cluster> cs = clusters.getClustersForHost(hostname);
- clusterGlobalLock.writeLock().lock();
- try {
- boolean clusterFound = false;
- Iterator<Cluster> iter = cs.iterator();
- while (iter.hasNext()) {
- Cluster c = iter.next();
- if (c.getClusterId() == getClusterId()) {
- clusterFound = true;
- break;
- }
+ boolean clusterFound = false;
+ Iterator<Cluster> iter = cs.iterator();
+ while (iter.hasNext()) {
+ Cluster c = iter.next();
+ if (c.getClusterId() == getClusterId()) {
+ clusterFound = true;
+ break;
}
+ }
- if (!clusterFound) {
- throw new AmbariException("Host does not belong this cluster"
- + ", hostname=" + hostname + ", clusterName=" + getClusterName()
- + ", clusterId=" + getClusterId());
- }
+ if (!clusterFound) {
+ throw new AmbariException("Host does not belong this cluster"
+ + ", hostname=" + hostname + ", clusterName=" + getClusterName()
+ + ", clusterId=" + getClusterId());
+ }
- if (!serviceComponentHosts.containsKey(serviceName)
- || !serviceComponentHosts.get(serviceName).containsKey(componentName)
- || !serviceComponentHosts.get(serviceName).get(componentName).containsKey(
- hostname)) {
- throw new AmbariException("Invalid entry for ServiceComponentHost"
- + ", serviceName=" + serviceName + ", serviceComponentName"
- + componentName + ", hostname= " + hostname);
- }
+ if (!serviceComponentHosts.containsKey(serviceName)
+ || !serviceComponentHosts.get(serviceName).containsKey(componentName)
+ || !serviceComponentHosts.get(serviceName).get(componentName).containsKey(
+ hostname)) {
+ throw new AmbariException("Invalid entry for ServiceComponentHost"
+ + ", serviceName=" + serviceName + ", serviceComponentName"
+ + componentName + ", hostname= " + hostname);
+ }
- if (!serviceComponentHostsByHost.containsKey(hostname)) {
- throw new AmbariException("Invalid host entry for ServiceComponentHost"
- + ", serviceName=" + serviceName + ", serviceComponentName"
- + componentName + ", hostname= " + hostname);
- }
+ if (!serviceComponentHostsByHost.containsKey(hostname)) {
+ throw new AmbariException("Invalid host entry for ServiceComponentHost"
+ + ", serviceName=" + serviceName + ", serviceComponentName"
+ + componentName + ", hostname= " + hostname);
+ }
- ServiceComponentHost schToRemove = null;
- for (ServiceComponentHost sch : serviceComponentHostsByHost.get(hostname)) {
- if (sch.getServiceName().equals(serviceName)
- && sch.getServiceComponentName().equals(componentName)
- && sch.getHostName().equals(hostname)) {
- schToRemove = sch;
- break;
- }
+ ServiceComponentHost schToRemove = null;
+ for (ServiceComponentHost sch : serviceComponentHostsByHost.get(hostname)) {
+ if (sch.getServiceName().equals(serviceName)
+ && sch.getServiceComponentName().equals(componentName)
+ && sch.getHostName().equals(hostname)) {
+ schToRemove = sch;
+ break;
}
+ }
- if (schToRemove == null) {
- LOG.warn("Unavailable in per host cache. ServiceComponentHost"
- + ", serviceName=" + serviceName
- + ", serviceComponentName" + componentName
- + ", hostname= " + hostname);
- }
+ if (schToRemove == null) {
+ LOG.warn("Unavailable in per host cache. ServiceComponentHost"
+ + ", serviceName=" + serviceName
+ + ", serviceComponentName" + componentName
+ + ", hostname= " + hostname);
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Removing a ServiceComponentHost" + ", clusterName="
- + getClusterName() + ", clusterId=" + getClusterId()
- + ", serviceName=" + serviceName + ", serviceComponentName"
- + componentName + ", hostname= " + hostname);
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing a ServiceComponentHost" + ", clusterName="
+ + getClusterName() + ", clusterId=" + getClusterId()
+ + ", serviceName=" + serviceName + ", serviceComponentName"
+ + componentName + ", hostname= " + hostname);
+ }
- serviceComponentHosts.get(serviceName).get(componentName).remove(hostname);
- if (schToRemove != null) {
- serviceComponentHostsByHost.get(hostname).remove(schToRemove);
- }
- } finally {
- clusterGlobalLock.writeLock().unlock();
+ serviceComponentHosts.get(serviceName).get(componentName).remove(hostname);
+ if (schToRemove != null) {
+ serviceComponentHostsByHost.get(hostname).remove(schToRemove);
}
}
@@ -911,19 +764,13 @@ public class ClusterImpl implements Cluster {
}
@Override
- public List<ServiceComponentHost> getServiceComponentHosts(
- String hostname) {
- loadServiceHostComponents();
- clusterGlobalLock.readLock().lock();
- try {
- if (serviceComponentHostsByHost.containsKey(hostname)) {
- return new CopyOnWriteArrayList<ServiceComponentHost>(
- serviceComponentHostsByHost.get(hostname));
- }
- return new ArrayList<ServiceComponentHost>();
- } finally {
- clusterGlobalLock.readLock().unlock();
+ public List<ServiceComponentHost> getServiceComponentHosts(String hostname) {
+ List<ServiceComponentHost> serviceComponentHosts = serviceComponentHostsByHost.get(hostname);
+ if (null != serviceComponentHosts) {
+ return new CopyOnWriteArrayList<ServiceComponentHost>(serviceComponentHosts);
}
+
+ return new ArrayList<ServiceComponentHost>();
}
@Override
@@ -967,21 +814,16 @@ public class ClusterImpl implements Cluster {
public List<ServiceComponentHost> getServiceComponentHosts(String serviceName, String componentName) {
ArrayList<ServiceComponentHost> foundItems = new ArrayList<ServiceComponentHost>();
- loadServiceHostComponents();
- clusterGlobalLock.readLock().lock();
- try {
- Map<String, Map<String, ServiceComponentHost>> foundByService = serviceComponentHosts.get(serviceName);
- if (foundByService != null) {
- if (componentName == null) {
- for(Map<String, ServiceComponentHost> foundByComponent :foundByService.values()) {
- foundItems.addAll(foundByComponent.values());
- }
- } else if (foundByService.containsKey(componentName)) {
- foundItems.addAll(foundByService.get(componentName).values());
+ ConcurrentMap<String, ConcurrentMap<String, ServiceComponentHost>> foundByService = serviceComponentHosts.get(
+ serviceName);
+ if (foundByService != null) {
+ if (componentName == null) {
+ for (Map<String, ServiceComponentHost> foundByComponent : foundByService.values()) {
+ foundItems.addAll(foundByComponent.values());
}
+ } else if (foundByService.containsKey(componentName)) {
+ foundItems.addAll(foundByService.get(componentName).values());
}
- } finally {
- clusterGlobalLock.readLock().unlock();
}
return foundItems;
@@ -989,76 +831,44 @@ public class ClusterImpl implements Cluster {
@Override
public void addService(Service service) {
- loadServices();
- clusterGlobalLock.writeLock().lock();
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding a new Service" + ", clusterName=" + getClusterName()
- + ", clusterId=" + getClusterId() + ", serviceName="
- + service.getName());
- }
- services.put(service.getName(), service);
- } finally {
- clusterGlobalLock.writeLock().unlock();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding a new Service" + ", clusterName=" + getClusterName() + ", clusterId="
+ + getClusterId() + ", serviceName=" + service.getName());
}
+ services.put(service.getName(), service);
}
@Override
public Service addService(String serviceName) throws AmbariException {
- loadServices();
- clusterGlobalLock.writeLock().lock();
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding a new Service" + ", clusterName=" + getClusterName()
+ if (services.containsKey(serviceName)) {
+ throw new AmbariException("Service already exists" + ", clusterName=" + getClusterName()
+ ", clusterId=" + getClusterId() + ", serviceName=" + serviceName);
- }
- if (services.containsKey(serviceName)) {
- throw new AmbariException("Service already exists" + ", clusterName="
- + getClusterName() + ", clusterId=" + getClusterId()
- + ", serviceName=" + serviceName);
- }
- Service s = serviceFactory.createNew(this, serviceName);
- services.put(s.getName(), s);
- return s;
- } finally {
- clusterGlobalLock.writeLock().unlock();
}
+
+ Service service = serviceFactory.createNew(this, serviceName);
+ addService(service);
+
+ return service;
}
@Override
public Service getService(String serviceName) throws AmbariException {
- loadServices();
- clusterGlobalLock.readLock().lock();
- try {
- if (!services.containsKey(serviceName)) {
- throw new ServiceNotFoundException(getClusterName(), serviceName);
- }
- return services.get(serviceName);
- } finally {
- clusterGlobalLock.readLock().unlock();
+ Service service = services.get(serviceName);
+ if (null == service) {
+ throw new ServiceNotFoundException(getClusterName(), serviceName);
}
+
+ return service;
}
@Override
public Map<String, Service> getServices() {
- loadServices();
- clusterGlobalLock.readLock().lock();
- try {
- return new HashMap<String, Service>(services);
- } finally {
- clusterGlobalLock.readLock().unlock();
- }
+ return new HashMap<String, Service>(services);
}
@Override
public StackId getDesiredStackVersion() {
- loadStackVersion();
- clusterGlobalLock.readLock().lock();
- try {
- return desiredStackVersion;
- } finally {
- clusterGlobalLock.readLock().unlock();
- }
+ return desiredStackVersion;
}
@Override
@@ -1081,25 +891,24 @@ public class ClusterImpl implements Cluster {
StackEntity stackEntity = stackDAO.find(stackId.getStackName(), stackId.getStackVersion());
ClusterEntity clusterEntity = getClusterEntity();
- if (clusterEntity != null) {
- clusterEntity.setDesiredStack(stackEntity);
- clusterEntity = clusterDAO.merge(clusterEntity);
- if (cascade) {
- for (Service service : getServices().values()) {
- service.setDesiredStackVersion(stackId);
+ clusterEntity.setDesiredStack(stackEntity);
+ clusterEntity = clusterDAO.merge(clusterEntity);
+
+ if (cascade) {
+ for (Service service : getServices().values()) {
+ service.setDesiredStackVersion(stackId);
- for (ServiceComponent sc : service.getServiceComponents().values()) {
- sc.setDesiredStackVersion(stackId);
+ for (ServiceComponent sc : service.getServiceComponents().values()) {
+ sc.setDesiredStackVersion(stackId);
- for (ServiceComponentHost sch : sc.getServiceComponentHosts().values()) {
- sch.setDesiredStackVersion(stackId);
- }
+ for (ServiceComponentHost sch : sc.getServiceComponentHosts().values()) {
+ sch.setDesiredStackVersion(stackId);
}
}
}
- loadServiceConfigTypes();
}
+ loadServiceConfigTypes();
} finally {
clusterGlobalLock.writeLock().unlock();
}
@@ -1107,89 +916,55 @@ public class ClusterImpl implements Cluster {
@Override
public StackId getCurrentStackVersion() {
- clusterGlobalLock.readLock().lock();
- try {
- ClusterEntity clusterEntity = getClusterEntity();
- if (clusterEntity != null) {
- ClusterStateEntity clusterStateEntity = clusterEntity.getClusterStateEntity();
- if (clusterStateEntity != null) {
- StackEntity currentStackEntity = clusterStateEntity.getCurrentStack();
- return new StackId(currentStackEntity);
- }
- }
+ ClusterEntity clusterEntity = getClusterEntity();
- return null;
- } finally {
- clusterGlobalLock.readLock().unlock();
+ ClusterStateEntity clusterStateEntity = clusterEntity.getClusterStateEntity();
+ if (clusterStateEntity != null) {
+ StackEntity currentStackEntity = clusterStateEntity.getCurrentStack();
+ return new StackId(currentStackEntity);
}
+
+ return null;
}
@Override
public State getProvisioningState() {
- clusterGlobalLock.readLock().lock();
State provisioningState = null;
- try {
- ClusterEntity clusterEntity = getClusterEntity();
- if (clusterEntity != null) {
- provisioningState = clusterEntity.getProvisioningState();
-
- if (null == provisioningState) {
- provisioningState = State.INIT;
- }
- }
+ ClusterEntity clusterEntity = getClusterEntity();
+ provisioningState = clusterEntity.getProvisioningState();
- return provisioningState;
- } finally {
- clusterGlobalLock.readLock().unlock();
+ if (null == provisioningState) {
+ provisioningState = State.INIT;
}
+
+ return provisioningState;
}
@Override
public void setProvisioningState(State provisioningState) {
- clusterGlobalLock.writeLock().lock();
- try {
- ClusterEntity clusterEntity = getClusterEntity();
- if (clusterEntity != null) {
- clusterEntity.setProvisioningState(provisioningState);
- clusterEntity = clusterDAO.merge(clusterEntity);
- }
- } finally {
- clusterGlobalLock.writeLock().unlock();
- }
+ ClusterEntity clusterEntity = getClusterEntity();
+ clusterEntity.setProvisioningState(provisioningState);
+ clusterEntity = clusterDAO.merge(clusterEntity);
}
@Override
public SecurityType getSecurityType() {
- clusterGlobalLock.readLock().lock();
SecurityType securityType = null;
- try {
- ClusterEntity clusterEntity = getClusterEntity();
- if (clusterEntity != null) {
- securityType = clusterEntity.getSecurityType();
-
- if (null == securityType) {
- securityType = SecurityType.NONE;
- }
- }
+ ClusterEntity clusterEntity = getClusterEntity();
+ securityType = clusterEntity.getSecurityType();
- return securityType;
- } finally {
- clusterGlobalLock.readLock().unlock();
+ if (null == securityType) {
+ securityType = SecurityType.NONE;
}
+
+ return securityType;
}
@Override
public void setSecurityType(SecurityType securityType) {
- clusterGlobalLock.writeLock().lock();
- try {
- ClusterEntity clusterEntity = getClusterEntity();
- if (clusterEntity != null) {
- clusterEntity.setSecurityType(securityType);
- clusterEntity = clusterDAO.merge(clusterEntity);
- }
- } finally {
- clusterGlobalLock.writeLock().unlock();
- }
+ ClusterEntity clusterEntity = getClusterEntity();
+ clusterEntity.setSecurityType(securityType);
+ clusterEntity = clusterDAO.merge(clusterEntity);
}
/**
@@ -1865,129 +1640,127 @@ public class ClusterImpl implements Cluster {
clusterGlobalLock.writeLock().lock();
try {
ClusterEntity clusterEntity = getClusterEntity();
- if (clusterEntity != null) {
- ClusterVersionEntity existingClusterVersion = clusterVersionDAO.findByClusterAndStackAndVersion(
+ ClusterVersionEntity existingClusterVersion = clusterVersionDAO.findByClusterAndStackAndVersion(
getClusterName(), stackId, version);
- if (existingClusterVersion == null) {
- throw new AmbariException(
- "Existing cluster version not found for cluster="
- + getClusterName() + ", stack=" + stackId + ", version="
- + version);
- }
+ if (existingClusterVersion == null) {
+ throw new AmbariException("Existing cluster version not found for cluster="
+ + getClusterName() + ", stack=" + stackId + ", version=" + version);
+ }
- // NOOP
- if (existingClusterVersion.getState() == state) {
- return;
- }
+ // NOOP
+ if (existingClusterVersion.getState() == state) {
+ return;
+ }
- switch (existingClusterVersion.getState()) {
- case CURRENT:
- // If CURRENT state is changed here cluster will not have CURRENT
- // state.
- // CURRENT state will be changed to INSTALLED when another CURRENT
- // state is added.
- // allowedStates.add(RepositoryVersionState.INSTALLED);
- break;
- case INSTALLING:
- allowedStates.add(RepositoryVersionState.INSTALLED);
- allowedStates.add(RepositoryVersionState.INSTALL_FAILED);
- allowedStates.add(RepositoryVersionState.OUT_OF_SYNC);
- if (clusterVersionDAO.findByClusterAndStateCurrent(getClusterName()) == null) {
- allowedStates.add(RepositoryVersionState.CURRENT);
- }
- break;
- case INSTALL_FAILED:
- allowedStates.add(RepositoryVersionState.INSTALLING);
- break;
- case INSTALLED:
- allowedStates.add(RepositoryVersionState.INSTALLING);
- allowedStates.add(RepositoryVersionState.OUT_OF_SYNC);
- allowedStates.add(RepositoryVersionState.CURRENT);
- break;
- case OUT_OF_SYNC:
- allowedStates.add(RepositoryVersionState.INSTALLING);
- break;
- case INIT:
+ switch (existingClusterVersion.getState()) {
+ case CURRENT:
+ // If CURRENT state is changed here cluster will not have CURRENT
+ // state.
+ // CURRENT state will be changed to INSTALLED when another CURRENT
+ // state is added.
+ // allowedStates.add(RepositoryVersionState.INSTALLED);
+ break;
+ case INSTALLING:
+ allowedStates.add(RepositoryVersionState.INSTALLED);
+ allowedStates.add(RepositoryVersionState.INSTALL_FAILED);
+ allowedStates.add(RepositoryVersionState.OUT_OF_SYNC);
+ if (clusterVersionDAO.findByClusterAndStateCurrent(getClusterName()) == null) {
allowedStates.add(RepositoryVersionState.CURRENT);
- break;
- }
+ }
+ break;
+ case INSTALL_FAILED:
+ allowedStates.add(RepositoryVersionState.INSTALLING);
+ break;
+ case INSTALLED:
+ allowedStates.add(RepositoryVersionState.INSTALLING);
+ allowedStates.add(RepositoryVersionState.OUT_OF_SYNC);
+ allowedStates.add(RepositoryVersionState.CURRENT);
+ break;
+ case OUT_OF_SYNC:
+ allowedStates.add(RepositoryVersionState.INSTALLING);
+ break;
+ case INIT:
+ allowedStates.add(RepositoryVersionState.CURRENT);
+ break;
+ }
- if (!allowedStates.contains(state)) {
- throw new AmbariException("Invalid cluster version transition from "
+ if (!allowedStates.contains(state)) {
+ throw new AmbariException("Invalid cluster version transition from "
+ existingClusterVersion.getState() + " to " + state);
- }
+ }
- // There must be at most one cluster version whose state is CURRENT at
- // all times.
- if (state == RepositoryVersionState.CURRENT) {
- ClusterVersionEntity currentVersion = clusterVersionDAO.findByClusterAndStateCurrent(getClusterName());
- if (currentVersion != null) {
- currentVersion.setState(RepositoryVersionState.INSTALLED);
- currentVersion = clusterVersionDAO.merge(currentVersion);
- }
+ // There must be at most one cluster version whose state is CURRENT at
+ // all times.
+ if (state == RepositoryVersionState.CURRENT) {
+ ClusterVersionEntity currentVersion = clusterVersionDAO.findByClusterAndStateCurrent(
+ getClusterName());
+ if (currentVersion != null) {
+ currentVersion.setState(RepositoryVersionState.INSTALLED);
+ currentVersion = clusterVersionDAO.merge(currentVersion);
}
+ }
- existingClusterVersion.setState(state);
- existingClusterVersion.setEndTime(System.currentTimeMillis());
- existingClusterVersion = clusterVersionDAO.merge(existingClusterVersion);
+ existingClusterVersion.setState(state);
+ existingClusterVersion.setEndTime(System.currentTimeMillis());
+ existingClusterVersion = clusterVersionDAO.merge(existingClusterVersion);
- if (state == RepositoryVersionState.CURRENT) {
- for (HostEntity hostEntity : clusterEntity.getHostEntities()) {
- if (hostHasReportables(existingClusterVersion.getRepositoryVersion(), hostEntity)) {
- continue;
- }
+ if (state == RepositoryVersionState.CURRENT) {
+ for (HostEntity hostEntity : clusterEntity.getHostEntities()) {
+ if (hostHasReportables(existingClusterVersion.getRepositoryVersion(), hostEntity)) {
+ continue;
+ }
- Collection<HostVersionEntity> versions = hostVersionDAO.findByHost(hostEntity.getHostName());
+ Collection<HostVersionEntity> versions = hostVersionDAO.findByHost(
+ hostEntity.getHostName());
- HostVersionEntity target = null;
- if (null != versions) {
- // Set anything that was previously marked CURRENT as INSTALLED, and
- // the matching version as CURRENT
- for (HostVersionEntity entity : versions) {
- if (entity.getRepositoryVersion().getId().equals(
+ HostVersionEntity target = null;
+ if (null != versions) {
+ // Set anything that was previously marked CURRENT as INSTALLED, and
+ // the matching version as CURRENT
+ for (HostVersionEntity entity : versions) {
+ if (entity.getRepositoryVersion().getId().equals(
existingClusterVersion.getRepositoryVersion().getId())) {
- target = entity;
- target.setState(state);
- target = hostVersionDAO.merge(target);
- } else if (entity.getState() == RepositoryVersionState.CURRENT) {
- entity.setState(RepositoryVersionState.INSTALLED);
- entity = hostVersionDAO.merge(entity);
- }
+ target = entity;
+ target.setState(state);
+ target = hostVersionDAO.merge(target);
+ } else if (entity.getState() == RepositoryVersionState.CURRENT) {
+ entity.setState(RepositoryVersionState.INSTALLED);
+ entity = hostVersionDAO.merge(entity);
}
}
+ }
- if (null == target) {
- // If no matching version was found, create one with the desired
- // state
- HostVersionEntity hve = new HostVersionEntity(hostEntity,
+ if (null == target) {
+ // If no matching version was found, create one with the desired
+ // state
+ HostVersionEntity hve = new HostVersionEntity(hostEntity,
existingClusterVersion.getRepositoryVersion(), state);
- LOG.info("Creating host version for {}, state={}, repo={} (repo_id={})",
- hve.getHostName(), hve.getState(),
- hve.getRepositoryVersion().getVersion(), hve.getRepositoryVersion().getId());
+ LOG.info("Creating host version for {}, state={}, repo={} (repo_id={})",
+ hve.getHostName(), hve.getState(), hve.getRepositoryVersion().getVersion(),
+ hve.getRepositoryVersion().getId());
- hostVersionDAO.create(hve);
- }
+ hostVersionDAO.create(hve);
}
+ }
- // when setting the cluster's state to current, we must also
- // bring the desired stack and current stack in line with each other
- StackEntity desiredStackEntity = clusterEntity.getDesiredStack();
- StackId desiredStackId = new StackId(desiredStackEntity);
+ // when setting the cluster's state to current, we must also
+ // bring the desired stack and current stack in line with each other
+ StackEntity desiredStackEntity = clusterEntity.getDesiredStack();
+ StackId desiredStackId = new StackId(desiredStackEntity);
- // if the desired stack ID doesn't match the target when setting the
- // cluster to CURRENT, then there's a problem
- if (!desiredStackId.equals(stackId)) {
- String message = MessageFormat.format(
+ // if the desired stack ID doesn't match the target when setting the
+ // cluster to CURRENT, then there's a problem
+ if (!desiredStackId.equals(stackId)) {
+ String message = MessageFormat.format(
"The desired stack ID {0} must match {1} when transitioning the cluster''s state to {2}",
desiredStackId, stackId, RepositoryVersionState.CURRENT);
- throw new AmbariException(message);
- }
-
- setCurrentStackVersion(stackId);
+ throw new AmbariException(message);
}
+
+ setCurrentStackVersion(stackId);
}
} catch (RollbackException e) {
String message = MessageFormat.format(
@@ -2036,22 +1809,21 @@ public class ClusterImpl implements Cluster {
stackId.getStackVersion());
ClusterEntity clusterEntity = getClusterEntity();
- if (clusterEntity != null) {
- ClusterStateEntity clusterStateEntity = clusterStateDAO.findByPK(clusterEntity.getClusterId());
- if (clusterStateEntity == null) {
- clusterStateEntity = new ClusterStateEntity();
- clusterStateEntity.setClusterId(clusterEntity.getClusterId());
- clusterStateEntity.setCurrentStack(stackEntity);
- clusterStateEntity.setClusterEntity(clusterEntity);
- clusterStateDAO.create(clusterStateEntity);
- clusterStateEntity = clusterStateDAO.merge(clusterStateEntity);
- clusterEntity.setClusterStateEntity(clusterStateEntity);
- clusterEntity = clusterDAO.merge(clusterEntity);
- } else {
- clusterStateEntity.setCurrentStack(stackEntity);
- clusterStateEntity = clusterStateDAO.merge(clusterStateEntity);
- clusterEntity = clusterDAO.merge(clusterEntity);
- }
+ ClusterStateEntity clusterStateEntity = clusterStateDAO.findByPK(
+ clusterEntity.getClusterId());
+ if (clusterStateEntity == null) {
+ clusterStateEntity = new ClusterStateEntity();
+ clusterStateEntity.setClusterId(clusterEntity.getClusterId());
+ clusterStateEntity.setCurrentStack(stackEntity);
+ clusterStateEntity.setClusterEntity(clusterEntity);
+ clusterStateDAO.create(clusterStateEntity);
+ clusterStateEntity = clusterStateDAO.merge(clusterStateEntity);
+ clusterEntity.setClusterStateEntity(clusterStateEntity);
+ clusterEntity = clusterDAO.merge(clusterEntity);
+ } else {
+ clusterStateEntity.setCurrentStack(stackEntity);
+ clusterStateEntity = clusterStateDAO.merge(clusterStateEntity);
+ clusterEntity = clusterDAO.merge(clusterEntity);
}
} catch (RollbackException e) {
LOG.warn("Unable to set version " + stackId + " for cluster "
@@ -2065,7 +1837,6 @@ public class ClusterImpl implements Cluster {
@Override
public Map<String, Config> getConfigsByType(String configType) {
- loadConfigurations();
clusterGlobalLock.readLock().lock();
try {
if (!allConfigs.containsKey(configType)) {
@@ -2080,7 +1851,6 @@ public class ClusterImpl implements Cluster {
@Override
public Config getConfig(String configType, String versionTag) {
- loadConfigurations();
clusterGlobalLock.readLock().lock();
try {
if (!allConfigs.containsKey(configType)
@@ -2100,11 +1870,13 @@ public class ClusterImpl implements Cluster {
if (!allConfigs.containsKey(configType)) {
return null;
}
- for(Map.Entry<String, Config> entry: allConfigs.get(configType).entrySet()) {
- if(entry.getValue().getVersion().equals(configVersion)) {
+
+ for (Map.Entry<String, Config> entry : allConfigs.get(configType).entrySet()) {
+ if (entry.getValue().getVersion().equals(configVersion)) {
return entry.getValue();
}
}
+
return null;
} finally {
clusterGlobalLock.readLock().unlock();
@@ -2113,14 +1885,14 @@ public class ClusterImpl implements Cluster {
@Override
public void addConfig(Config config) {
- loadConfigurations();
+ if (config.getType() == null || config.getType().isEmpty()) {
+ throw new IllegalArgumentException("Config type cannot be empty");
+ }
+
clusterGlobalLock.writeLock().lock();
try {
- if (config.getType() == null || config.getType().isEmpty()) {
- throw new IllegalArgumentException("Config type cannot be empty");
- }
if (!allConfigs.containsKey(config.getType())) {
- allConfigs.put(config.getType(), new HashMap<String, Config>());
+ allConfigs.put(config.getType(), new ConcurrentHashMap<String, Config>());
}
allConfigs.get(config.getType()).put(config.getTag(), config);
@@ -2131,11 +1903,10 @@ public class ClusterImpl implements Cluster {
@Override
public Collection<Config> getAllConfigs() {
- loadConfigurations();
clusterGlobalLock.readLock().lock();
try {
List<Config> list = new ArrayList<Config>();
- for (Entry<String, Map<String, Config>> entry : allConfigs.entrySet()) {
+ for (Entry<String, ConcurrentMap<String, Config>> entry : allConfigs.entrySet()) {
for (Config config : entry.getValue().values()) {
list.add(config);
}
@@ -2149,44 +1920,31 @@ public class ClusterImpl implements Cluster {
@Override
public ClusterResponse convertToResponse()
throws AmbariException {
- loadStackVersion();
String clusterName = getClusterName();
Map<String, Host> hosts = clusters.getHostsForCluster(clusterName);
- clusterGlobalLock.readLock().lock();
- try {
- return new ClusterResponse(getClusterId(), clusterName,
- getProvisioningState(), getSecurityType(), hosts.keySet(),
- hosts.size(), getDesiredStackVersion().getStackId(),
- getClusterHealthReport(hosts));
- } finally {
- clusterGlobalLock.readLock().unlock();
- }
+
+ return new ClusterResponse(getClusterId(), clusterName,
+ getProvisioningState(), getSecurityType(), hosts.keySet(),
+ hosts.size(), getDesiredStackVersion().getStackId(),
+ getClusterHealthReport(hosts));
}
@Override
public void debugDump(StringBuilder sb) {
- loadServices();
- loadStackVersion();
- clusterGlobalLock.readLock().lock();
- try {
- sb.append("Cluster={ clusterName=").append(getClusterName()).append(
- ", clusterId=").append(getClusterId()).append(
- ", desiredStackVersion=").append(desiredStackVersion.getStackId()).append(
- ", services=[ ");
- boolean first = true;
- for (Service s : services.values()) {
- if (!first) {
- sb.append(" , ");
- }
- first = false;
- sb.append("\n ");
- s.debugDump(sb);
- sb.append(' ');
+ sb.append("Cluster={ clusterName=").append(getClusterName()).append(", clusterId=").append(
+ getClusterId()).append(", desiredStackVersion=").append(
+ desiredStackVersion.getStackId()).append(", services=[ ");
+ boolean first = true;
+ for (Service s : services.values()) {
+ if (!first) {
+ sb.append(" , ");
}
- sb.append(" ] }");
- } finally {
- clusterGlobalLock.readLock().unlock();
+ first = false;
+ sb.append("\n ");
+ s.debugDump(sb);
+ sb.append(' ');
}
+ sb.append(" ] }");
}
@Override
@@ -2204,7 +1962,6 @@ public class ClusterImpl implements Cluster {
@Override
@Transactional
public void deleteAllServices() throws AmbariException {
- loadServices();
clusterGlobalLock.writeLock().lock();
try {
LOG.info("Deleting all services for cluster" + ", clusterName="
@@ -2230,7 +1987,6 @@ public class ClusterImpl implements Cluster {
@Override
public void deleteService(String serviceName)
throws AmbariException {
- loadServices();
clusterGlobalLock.writeLock().lock();
try {
Service service = getService(serviceName);
@@ -2279,7 +2035,6 @@ public class ClusterImpl implements Cluster {
@Override
public boolean canBeRemoved() {
- loadServices();
clusterGlobalLock.readLock().lock();
try {
boolean safeToRemove = true;
@@ -2394,7 +2149,6 @@ public class ClusterImpl implements Cluster {
* @return a map of type-to-configuration information.
*/
private Map<String, Set<DesiredConfig>> getDesiredConfigs(boolean allVersions) {
- loadConfigurations();
clusterGlobalLock.readLock().lock();
try {
Map<String, Set<DesiredConfig>> map = new HashMap<>();
@@ -2480,37 +2234,38 @@ public class ClusterImpl implements Cluster {
clusterGlobalLock.writeLock().lock();
try {
ClusterEntity clusterEntity = getClusterEntity();
- if (clusterEntity != null) {
- // set config group
- if (configGroup != null) {
- serviceConfigEntity.setGroupId(configGroup.getId());
- Collection<Config> configs = configGroup.getConfigurations().values();
- List<ClusterConfigEntity> configEntities = new ArrayList<ClusterConfigEntity>(configs.size());
- for (Config config : configs) {
- configEntities.add(clusterDAO.findConfig(getClusterId(), config.getType(), config.getTag()));
- }
-
- serviceConfigEntity.setClusterConfigEntities(configEntities);
- } else {
- List<ClusterConfigEntity> configEntities = getClusterConfigEntitiesByService(serviceName);
- serviceConfigEntity.setClusterConfigEntities(configEntities);
+ // set config group
+ if (configGroup != null) {
+ serviceConfigEntity.setGroupId(configGroup.getId());
+ Collection<Config> configs = configGroup.getConfigurations().values();
+ List<ClusterConfigEntity> configEntities = new ArrayList<ClusterConfigEntity>(
+ configs.size());
+ for (Config config : configs) {
+ configEntities.add(
+ clusterDAO.findConfig(getClusterId(), config.getType(), config.getTag()));
}
+ serviceConfigEntity.setClusterConfigEntities(configEntities);
+ } else {
+ List<ClusterConfigEntity> configEntities = getClusterConfigEntitiesByService(serviceName);
+ serviceConfigEntity.setClusterConfigEntities(configEntities);
+ }
- long nextServiceConfigVersion = serviceConfigDAO.findNextServiceConfigVersion(clusterId, serviceName);
- serviceConfigEntity.setServiceName(serviceName);
- serviceConfigEntity.setClusterEntity(clusterEntity);
- serviceConfigEntity.setVersion(nextServiceConfigVersion);
- serviceConfigEntity.setUser(user);
- serviceConfigEntity.setNote(note);
- serviceConfigEntity.setStack(clusterEntity.getDesiredStack());
+ long nextServiceConfigVersion = serviceConfigDAO.findNextServiceConfigVersion(clusterId,
+ serviceName);
- serviceConfigDAO.create(serviceConfigEntity);
- if (configGroup != null) {
- serviceConfigEntity.setHostIds(new ArrayList<Long>(configGroup.getHosts().keySet()));
- serviceConfigEntity = serviceConfigDAO.merge(serviceConfigEntity);
- }
+ serviceConfigEntity.setServiceName(serviceName);
+ serviceConfigEntity.setClusterEntity(clusterEntity);
+ serviceConfigEntity.setVersion(nextServiceConfigVersion);
+ serviceConfigEntity.setUser(user);
+ serviceConfigEntity.setNote(note);
+ serviceConfigEntity.setStack(clusterEntity.getDesiredStack());
+
+ serviceConfigDAO.create(serviceConfigEntity);
+ if (configGroup != null) {
+ serviceConfigEntity.setHostIds(new ArrayList<Long>(configGroup.getHosts().keySet()));
+ serviceConfigEntity = serviceConfigDAO.merge(serviceConfigEntity);
}
} finally {
clusterGlobalLock.writeLock().unlock();
@@ -2603,7 +2358,6 @@ public class ClusterImpl implements Cluster {
@Override
public List<ServiceConfigVersionResponse> getServiceConfigVersions() {
- loadConfigurations();
clusterGlobalLock.readLock().lock();
try {
List<ServiceConfigVersionResponse> serviceConfigVersionResponses = new ArrayList<ServiceConfigVersionResponse>();
@@ -2932,35 +2686,26 @@ public class ClusterImpl implements Cluster {
@Override
public Config getDesiredConfigByType(String configType) {
- loadConfigurations();
- clusterGlobalLock.readLock().lock();
- try {
- for (ClusterConfigMappingEntity e : clusterDAO.getClusterConfigMappingEntitiesByCluster(getClusterId())) {
- if (e.isSelected() > 0 && e.getType().equals(configType)) {
- return getConfig(e.getType(), e.getTag());
- }
+ for (ClusterConfigMappingEntity e : clusterDAO.getClusterConfigMappingEntitiesByCluster(
+ getClusterId())) {
+ if (e.isSelected() > 0 && e.getType().equals(configType)) {
+ return getConfig(e.getType(), e.getTag());
}
-
- return null;
- } finally {
- clusterGlobalLock.readLock().unlock();
}
+
+ return null;
}
@Override
public boolean isConfigTypeExists(String configType) {
- clusterGlobalLock.readLock().lock();
- try {
- for (ClusterConfigMappingEntity e : clusterDAO.getClusterConfigMappingEntitiesByCluster(getClusterId())) {
- if (e.getType().equals(configType)) {
- return true;
- }
+ for (ClusterConfigMappingEntity e : clusterDAO.getClusterConfigMappingEntitiesByCluster(
+ getClusterId())) {
+ if (e.getType().equals(configType)) {
+ return true;
}
-
- return false;
- } finally {
- clusterGlobalLock.readLock().unlock();
}
+
+ return false;
}
@Override
@@ -3131,13 +2876,13 @@ public class ClusterImpl implements Cluster {
*/
@Override
public Set<String> getHosts(String serviceName, String componentName) {
- Map<String, Service> services = getServices();
+ Map<String, Service> clusterServices = getServices();
- if (!services.containsKey(serviceName)) {
+ if (!clusterServices.containsKey(serviceName)) {
return Collections.emptySet();
}
- Service service = services.get(serviceName);
+ Service service = clusterServices.get(serviceName);
Map<String, ServiceComponent> components = service.getServiceComponents();
if (!components.containsKey(componentName) ||
@@ -3256,16 +3001,14 @@ public class ClusterImpl implements Cluster {
@Override
public boolean checkPermission(PrivilegeEntity privilegeEntity, boolean readOnly) {
ClusterEntity clusterEntity = getClusterEntity();
- if (clusterEntity != null) {
- ResourceEntity resourceEntity = clusterEntity.getResource();
- if (resourceEntity != null) {
- Integer permissionId = privilegeEntity.getPermission().getId();
- // CLUSTER.USER or CLUSTER.ADMINISTRATOR for the given cluster resource.
- if (privilegeEntity.getResource().equals(resourceEntity)) {
- if ((readOnly && permissionId.equals(PermissionEntity.CLUSTER_USER_PERMISSION)) ||
- permissionId.equals(PermissionEntity.CLUSTER_ADMINISTRATOR_PERMISSION)) {
- return true;
- }
+ ResourceEntity resourceEntity = clusterEntity.getResource();
+ if (resourceEntity != null) {
+ Integer permissionId = privilegeEntity.getPermission().getId();
+ // CLUSTER.USER or CLUSTER.ADMINISTRATOR for the given cluster resource.
+ if (privilegeEntity.getResource().equals(resourceEntity)) {
+ if ((readOnly && permissionId.equals(PermissionEntity.CLUSTER_USER_PERMISSION))
+ || permissionId.equals(PermissionEntity.CLUSTER_ADMINISTRATOR_PERMISSION)) {
+ return true;
}
}
}
@@ -3531,19 +3274,16 @@ public class ClusterImpl implements Cluster {
* Caches all of the {@link ClusterConfigEntity}s in {@link #allConfigs}.
*/
private void cacheConfigurations() {
- ClusterEntity clusterEntity = getClusterEntity();
- if (clusterEntity != null) {
- if (null == allConfigs) {
- allConfigs = new HashMap<String, Map<String, Config>>();
- }
-
+ clusterGlobalLock.writeLock().lock();
+ try {
+ ClusterEntity clusterEntity = getClusterEntity();
allConfigs.clear();
if (!clusterEntity.getClusterConfigEntities().isEmpty()) {
for (ClusterConfigEntity entity : clusterEntity.getClusterConfigEntities()) {
if (!allConfigs.containsKey(entity.getType())) {
- allConfigs.put(entity.getType(), new HashMap<String, Config>());
+ allConfigs.put(entity.getType(), new ConcurrentHashMap<String, Config>());
}
Config config = configFactory.createExisting(this, entity);
@@ -3551,80 +3291,23 @@ public class ClusterImpl implements Cluster {
allConfigs.get(entity.getType()).put(entity.getTag(), config);
}
}
- }
- }
-
- private void loadConfigurations() {
- if (allConfigs != null) {
- return;
- }
- clusterGlobalLock.writeLock().lock();
- try {
- if (allConfigs != null) {
- return;
- }
- cacheConfigurations();
-
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
private void loadStackVersion() {
- if (desiredStackVersionSet) {
- return;
- }
- clusterGlobalLock.writeLock().lock();
- try {
-
- if (desiredStackVersionSet) {
- return;
- }
-
- desiredStackVersion = new StackId(getClusterEntity().getDesiredStack());
+ desiredStackVersion = new StackId(getClusterEntity().getDesiredStack());
- if (!StringUtils.isEmpty(desiredStackVersion.getStackName()) && !
- StringUtils.isEmpty(desiredStackVersion.getStackVersion())) {
- try {
- loadServiceConfigTypes();
- } catch (AmbariException e) {
- //TODO recheck wrapping exception here, required for lazy loading after invalidation
- throw new RuntimeException(e);
- }
+ if (!StringUtils.isEmpty(desiredStackVersion.getStackName())
+ && !StringUtils.isEmpty(desiredStackVersion.getStackVersion())) {
+ try {
+ loadServiceConfigTypes();
+ } catch (AmbariException e) {
+ // TODO recheck wrapping exception here, required for lazy loading after
+ // invalidation
+ throw new RuntimeException(e);
}
-
- desiredStackVersionSet = true;
-
- } finally {
- clusterGlobalLock.writeLock().unlock();
- }
-
- }
-
- /**
- * Purpose of this method is to clear all cached data to re-read it from database.
- * To be used in case of desync.
- */
- @Override
- public void invalidateData() {
- clusterGlobalLock.writeLock().lock();
- try {
- allConfigs = null;
- services = null;
- desiredStackVersionSet = false;
-
- serviceComponentHosts.clear();
- serviceComponentHostsByHost.clear();
- svcHostsLoaded = false;
-
- clusterConfigGroups = null;
-
- //TODO investigate reset request executions, it has separate api which is not too heavy
-
- refresh();
-
- } finally {
- clusterGlobalLock.writeLock().unlock();
}
}
@@ -3674,17 +3357,8 @@ public class ClusterImpl implements Cluster {
*/
@Override
public UpgradeEntity getUpgradeEntity() {
- clusterGlobalLock.readLock().lock();
- try {
- ClusterEntity clusterEntity = getClusterEntity();
- if (clusterEntity != null) {
- return clusterEntity.getUpgradeEntity();
- } else {
- return null;
- }
- } finally {
- clusterGlobalLock.readLock().unlock();
- }
+ ClusterEntity clusterEntity = getClusterEntity();
+ return clusterEntity.getUpgradeEntity();
}
/**
@@ -3693,20 +3367,15 @@ public class ClusterImpl implements Cluster {
@Override
@Transactional
public void setUpgradeEntity(UpgradeEntity upgradeEntity) throws AmbariException {
- clusterGlobalLock.writeLock().lock();
try {
ClusterEntity clusterEntity = getClusterEntity();
- if (clusterEntity != null) {
- clusterEntity.setUpgradeEntity(upgradeEntity);
- clusterDAO.merge(clusterEntity);
- }
+ clusterEntity.setUpgradeEntity(upgradeEntity);
+ clusterDAO.merge(clusterEntity);
} catch (RollbackException e) {
String msg = "Unable to set upgrade entiry " + upgradeEntity + " for cluster "
+ getClusterName();
LOG.warn(msg);
throw new AmbariException(msg, e);
- } finally {
- clusterGlobalLock.writeLock().unlock();
}
}
@@ -3778,4 +3447,5 @@ public class ClusterImpl implements Cluster {
m_clusterPropertyCache.clear();
}
+
}