You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2015/01/29 00:33:02 UTC
[2/3] ambari git commit: AMBARI-9368 - Deadlock Between Dependent
Cluster/Service/Component/Host Implementations (jonathanhurley)
http://git-wip-us.apache.org/repos/asf/ambari/blob/23d506e2/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index 0bfaa19..2328bed 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -30,15 +30,14 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.persistence.RollbackException;
import org.apache.ambari.server.AmbariException;
-import org.apache.ambari.server.DuplicateResourceException;
import org.apache.ambari.server.ConfigGroupNotFoundException;
+import org.apache.ambari.server.DuplicateResourceException;
import org.apache.ambari.server.ObjectNotFoundException;
import org.apache.ambari.server.ParentObjectNotFoundException;
import org.apache.ambari.server.ServiceComponentHostNotFoundException;
@@ -82,7 +81,6 @@ import org.apache.ambari.server.orm.entities.RequestScheduleEntity;
import org.apache.ambari.server.orm.entities.ResourceEntity;
import org.apache.ambari.server.orm.entities.ServiceConfigEntity;
import org.apache.ambari.server.security.authorization.AuthorizationHelper;
-import org.apache.ambari.server.state.Alert;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.ClusterHealthReport;
import org.apache.ambari.server.state.Clusters;
@@ -118,10 +116,10 @@ import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Sets;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Injector;
@@ -174,16 +172,10 @@ public class ClusterImpl implements Cluster {
*/
private Map<Long, RequestExecution> requestExecutions;
- private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- private Lock readLock = readWriteLock.readLock();
- private Lock writeLock = readWriteLock.writeLock();
-
private final ReadWriteLock clusterGlobalLock = new ReentrantReadWriteLock();
private ClusterEntity clusterEntity;
- private Set<Alert> clusterAlerts = new HashSet<Alert>();
-
private final ConfigVersionHelper configVersionHelper;
@Inject
@@ -332,116 +324,101 @@ public class ClusterImpl implements Cluster {
if (svcHostsLoaded) {
return;
}
+
clusterGlobalLock.writeLock().lock();
+
try {
- writeLock.lock();
- try {
- LOG.info("Loading Service Host Components");
- if (svcHostsLoaded) {
- return;
- }
- if (services != null) {
- for (Entry<String, Service> serviceKV : services.entrySet()) {
+ LOG.info("Loading Service Host Components");
+ if (svcHostsLoaded) {
+ return;
+ }
+ if (services != null) {
+ for (Entry<String, Service> serviceKV : services.entrySet()) {
/* get all the service component hosts **/
- Service service = serviceKV.getValue();
- if (!serviceComponentHosts.containsKey(service.getName())) {
- serviceComponentHosts.put(service.getName(), new HashMap<String,
- Map<String, ServiceComponentHost>>());
- }
- for (Entry<String, ServiceComponent> svcComponent :
- service.getServiceComponents().entrySet()) {
- ServiceComponent comp = svcComponent.getValue();
- String componentName = svcComponent.getKey();
- if (!serviceComponentHosts.get(service.getName()).containsKey(componentName)) {
- serviceComponentHosts.get(service.getName()).put(componentName,
+ Service service = serviceKV.getValue();
+ if (!serviceComponentHosts.containsKey(service.getName())) {
+ serviceComponentHosts.put(service.getName(),
+ new HashMap<String, Map<String, ServiceComponentHost>>());
+ }
+ for (Entry<String, ServiceComponent> svcComponent : service.getServiceComponents().entrySet()) {
+ ServiceComponent comp = svcComponent.getValue();
+ String componentName = svcComponent.getKey();
+ if (!serviceComponentHosts.get(service.getName()).containsKey(
+ componentName)) {
+ serviceComponentHosts.get(service.getName()).put(componentName,
new HashMap<String, ServiceComponentHost>());
- }
- /** Get Service Host Components **/
- for (Entry<String, ServiceComponentHost> svchost :
- comp.getServiceComponentHosts().entrySet()) {
- String hostname = svchost.getKey();
- ServiceComponentHost svcHostComponent = svchost.getValue();
- if (!serviceComponentHostsByHost.containsKey(hostname)) {
- serviceComponentHostsByHost.put(hostname,
+ }
+ /** Get Service Host Components **/
+ for (Entry<String, ServiceComponentHost> svchost : comp.getServiceComponentHosts().entrySet()) {
+ String hostname = svchost.getKey();
+ ServiceComponentHost svcHostComponent = svchost.getValue();
+ if (!serviceComponentHostsByHost.containsKey(hostname)) {
+ serviceComponentHostsByHost.put(hostname,
new ArrayList<ServiceComponentHost>());
- }
- List<ServiceComponentHost> compList = serviceComponentHostsByHost.get(hostname);
- compList.add(svcHostComponent);
+ }
+ List<ServiceComponentHost> compList = serviceComponentHostsByHost.get(hostname);
+ compList.add(svcHostComponent);
- if (!serviceComponentHosts.get(service.getName()).get(componentName)
- .containsKey(hostname)) {
- serviceComponentHosts.get(service.getName()).get(componentName)
- .put(hostname, svcHostComponent);
- }
+ if (!serviceComponentHosts.get(service.getName()).get(
+ componentName).containsKey(hostname)) {
+ serviceComponentHosts.get(service.getName()).get(componentName).put(
+ hostname, svcHostComponent);
}
}
}
}
- svcHostsLoaded = true;
- } finally {
- writeLock.unlock();
}
+ svcHostsLoaded = true;
} finally {
clusterGlobalLock.writeLock().unlock();
}
-
}
private void loadServices() {
- //logging here takes too much time
-// LOG.info("clusterEntity " + clusterEntity.getClusterServiceEntities() );
if (services == null) {
clusterGlobalLock.writeLock().lock();
+
try {
- writeLock.lock();
- try {
- if (services == null) {
- services = new TreeMap<String, Service>();
- if (!clusterEntity.getClusterServiceEntities().isEmpty()) {
- for (ClusterServiceEntity serviceEntity : clusterEntity.getClusterServiceEntities()) {
- StackId stackId = getCurrentStackVersion();
- try {
- if (ambariMetaInfo.getService(stackId.getStackName(), stackId.getStackVersion(),
- serviceEntity.getServiceName()) != null) {
- services.put(serviceEntity.getServiceName(), serviceFactory.createExisting(this, serviceEntity));
- }
- } catch (AmbariException e) {
- LOG.error(String.format("Can not get service info: stackName=%s, stackVersion=%s, serviceName=%s",
- stackId.getStackName(), stackId.getStackVersion(),
- serviceEntity.getServiceName()));
- e.printStackTrace();
+ if (services == null) {
+ services = new TreeMap<String, Service>();
+ if (!clusterEntity.getClusterServiceEntities().isEmpty()) {
+ for (ClusterServiceEntity serviceEntity : clusterEntity.getClusterServiceEntities()) {
+ StackId stackId = getCurrentStackVersion();
+ try {
+ if (ambariMetaInfo.getService(stackId.getStackName(),
+ stackId.getStackVersion(), serviceEntity.getServiceName()) != null) {
+ services.put(serviceEntity.getServiceName(),
+ serviceFactory.createExisting(this, serviceEntity));
}
+ } catch (AmbariException e) {
+ LOG.error(String.format(
+ "Can not get service info: stackName=%s, stackVersion=%s, serviceName=%s",
+ stackId.getStackName(), stackId.getStackVersion(),
+ serviceEntity.getServiceName()));
+ e.printStackTrace();
}
}
}
- } finally {
- writeLock.unlock();
}
} finally {
clusterGlobalLock.writeLock().unlock();
}
-
}
}
private void loadConfigGroups() {
if (clusterConfigGroups == null) {
clusterGlobalLock.writeLock().lock();
+
try {
- writeLock.lock();
- try {
- if (clusterConfigGroups == null) {
- clusterConfigGroups = new HashMap<Long, ConfigGroup>();
- if (!clusterEntity.getConfigGroupEntities().isEmpty()) {
- for (ConfigGroupEntity configGroupEntity :
- clusterEntity.getConfigGroupEntities()) {
- clusterConfigGroups.put(configGroupEntity.getGroupId(),
+ if (clusterConfigGroups == null) {
+ clusterConfigGroups = new HashMap<Long, ConfigGroup>();
+ if (!clusterEntity.getConfigGroupEntities().isEmpty()) {
+ for (ConfigGroupEntity configGroupEntity : clusterEntity.getConfigGroupEntities()) {
+ clusterConfigGroups.put(configGroupEntity.getGroupId(),
configGroupFactory.createExisting(this, configGroupEntity));
- }
}
}
- } finally {
- writeLock.unlock();
}
} finally {
clusterGlobalLock.writeLock().unlock();
@@ -453,20 +430,14 @@ public class ClusterImpl implements Cluster {
if (requestExecutions == null) {
clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
- try {
- if (requestExecutions == null) {
- requestExecutions = new HashMap<Long, RequestExecution>();
- if (!clusterEntity.getRequestScheduleEntities().isEmpty()) {
- for (RequestScheduleEntity scheduleEntity : clusterEntity
- .getRequestScheduleEntities()) {
- requestExecutions.put(scheduleEntity.getScheduleId(),
+ if (requestExecutions == null) {
+ requestExecutions = new HashMap<Long, RequestExecution>();
+ if (!clusterEntity.getRequestScheduleEntities().isEmpty()) {
+ for (RequestScheduleEntity scheduleEntity : clusterEntity.getRequestScheduleEntities()) {
+ requestExecutions.put(scheduleEntity.getScheduleId(),
requestExecutionFactory.createExisting(this, scheduleEntity));
- }
}
}
- } finally {
- writeLock.unlock();
}
} finally {
clusterGlobalLock.writeLock().unlock();
@@ -479,28 +450,21 @@ public class ClusterImpl implements Cluster {
loadConfigGroups();
clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
- try {
- LOG.debug("Adding a new Config group"
- + ", clusterName = " + getClusterName()
- + ", groupName = " + configGroup.getName()
+ LOG.debug("Adding a new Config group" + ", clusterName = "
+ + getClusterName() + ", groupName = " + configGroup.getName()
+ ", tag = " + configGroup.getTag());
- if (clusterConfigGroups.containsKey(configGroup.getId())) {
- // The loadConfigGroups will load all groups to memory
- LOG.debug("Config group already exists"
- + ", clusterName = " + getClusterName()
- + ", groupName = " + configGroup.getName()
- + ", groupId = " + configGroup.getId()
- + ", tag = " + configGroup.getTag());
- } else {
- clusterConfigGroups.put(configGroup.getId(), configGroup);
- configHelper.invalidateStaleConfigsCache();
- }
-
- } finally {
- writeLock.unlock();
+ if (clusterConfigGroups.containsKey(configGroup.getId())) {
+ // The loadConfigGroups will load all groups to memory
+ LOG.debug("Config group already exists" + ", clusterName = "
+ + getClusterName() + ", groupName = " + configGroup.getName()
+ + ", groupId = " + configGroup.getId() + ", tag = "
+ + configGroup.getTag());
+ } else {
+ clusterConfigGroups.put(configGroup.getId(), configGroup);
+ configHelper.invalidateStaleConfigsCache();
}
+
} finally {
clusterGlobalLock.writeLock().unlock();
}
@@ -511,12 +475,7 @@ public class ClusterImpl implements Cluster {
loadConfigGroups();
clusterGlobalLock.readLock().lock();
try {
- readLock.lock();
- try {
- return Collections.unmodifiableMap(clusterConfigGroups);
- } finally {
- readLock.unlock();
- }
+ return Collections.unmodifiableMap(clusterConfigGroups);
} finally {
clusterGlobalLock.readLock().unlock();
}
@@ -530,23 +489,19 @@ public class ClusterImpl implements Cluster {
clusterGlobalLock.readLock().lock();
try {
- readLock.lock();
- try {
- Set<ConfigGroupHostMapping> hostMappingEntities = configGroupHostMappingDAO.findByHost(hostname);
-
- if (hostMappingEntities != null && !hostMappingEntities.isEmpty()) {
- for (ConfigGroupHostMapping entity : hostMappingEntities) {
- ConfigGroup configGroup = configGroupMap.get(entity.getConfigGroupId());
- if (configGroup != null && !configGroups.containsKey(configGroup.getId())) {
- configGroups.put(configGroup.getId(), configGroup);
- }
+ Set<ConfigGroupHostMapping> hostMappingEntities = configGroupHostMappingDAO.findByHost(hostname);
+
+ if (hostMappingEntities != null && !hostMappingEntities.isEmpty()) {
+ for (ConfigGroupHostMapping entity : hostMappingEntities) {
+ ConfigGroup configGroup = configGroupMap.get(entity.getConfigGroupId());
+ if (configGroup != null
+ && !configGroups.containsKey(configGroup.getId())) {
+ configGroups.put(configGroup.getId(), configGroup);
}
}
- return configGroups;
-
- } finally {
- readLock.unlock();
}
+ return configGroups;
+
} finally {
clusterGlobalLock.readLock().unlock();
}
@@ -557,23 +512,16 @@ public class ClusterImpl implements Cluster {
loadRequestExecutions();
clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
- try {
- LOG.info("Adding a new request schedule"
- + ", clusterName = " + getClusterName()
- + ", id = " + requestExecution.getId()
- + ", description = " + requestExecution.getDescription());
+ LOG.info("Adding a new request schedule" + ", clusterName = "
+ + getClusterName() + ", id = " + requestExecution.getId()
+ + ", description = " + requestExecution.getDescription());
- if (requestExecutions.containsKey(requestExecution.getId())) {
- LOG.debug("Request schedule already exists"
- + ", clusterName = " + getClusterName()
- + ", id = " + requestExecution.getId()
+ if (requestExecutions.containsKey(requestExecution.getId())) {
+ LOG.debug("Request schedule already exists" + ", clusterName = "
+ + getClusterName() + ", id = " + requestExecution.getId()
+ ", description = " + requestExecution.getDescription());
- } else {
- requestExecutions.put(requestExecution.getId(), requestExecution);
- }
- } finally {
- writeLock.unlock();
+ } else {
+ requestExecutions.put(requestExecution.getId(), requestExecution);
}
} finally {
clusterGlobalLock.writeLock().unlock();
@@ -585,12 +533,7 @@ public class ClusterImpl implements Cluster {
loadRequestExecutions();
clusterGlobalLock.readLock().lock();
try {
- readLock.lock();
- try {
- return Collections.unmodifiableMap(requestExecutions);
- } finally {
- readLock.unlock();
- }
+ return Collections.unmodifiableMap(requestExecutions);
} finally {
clusterGlobalLock.readLock().unlock();
}
@@ -601,23 +544,17 @@ public class ClusterImpl implements Cluster {
loadRequestExecutions();
clusterGlobalLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- RequestExecution requestExecution = requestExecutions.get(id);
- if (requestExecution == null) {
- throw new AmbariException("Request schedule does not exists, " +
- "id = " + id);
- }
- LOG.info("Deleting request schedule"
- + ", clusterName = " + getClusterName()
- + ", id = " + requestExecution.getId()
+ RequestExecution requestExecution = requestExecutions.get(id);
+ if (requestExecution == null) {
+ throw new AmbariException("Request schedule does not exists, "
+ + "id = " + id);
+ }
+ LOG.info("Deleting request schedule" + ", clusterName = "
+ + getClusterName() + ", id = " + requestExecution.getId()
+ ", description = " + requestExecution.getDescription());
- requestExecution.delete();
- requestExecutions.remove(id);
- } finally {
- readWriteLock.writeLock().unlock();
- }
+ requestExecution.delete();
+ requestExecutions.remove(id);
} finally {
clusterGlobalLock.writeLock().unlock();
}
@@ -628,165 +565,133 @@ public class ClusterImpl implements Cluster {
loadConfigGroups();
clusterGlobalLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- ConfigGroup configGroup = clusterConfigGroups.get(id);
- if (configGroup == null) {
- throw new ConfigGroupNotFoundException(getClusterName(), id.toString());
- }
- LOG.debug("Deleting Config group"
- + ", clusterName = " + getClusterName()
- + ", groupName = " + configGroup.getName()
- + ", groupId = " + configGroup.getId()
- + ", tag = " + configGroup.getTag());
-
- configGroup.delete();
- clusterConfigGroups.remove(id);
- configHelper.invalidateStaleConfigsCache();
- } finally {
- readWriteLock.writeLock().unlock();
+ ConfigGroup configGroup = clusterConfigGroups.get(id);
+ if (configGroup == null) {
+ throw new ConfigGroupNotFoundException(getClusterName(), id.toString());
}
+ LOG.debug("Deleting Config group" + ", clusterName = " + getClusterName()
+ + ", groupName = " + configGroup.getName() + ", groupId = "
+ + configGroup.getId() + ", tag = " + configGroup.getTag());
+
+ configGroup.delete();
+ clusterConfigGroups.remove(id);
+ configHelper.invalidateStaleConfigsCache();
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
public ServiceComponentHost getServiceComponentHost(String serviceName,
- String serviceComponentName, String hostname) throws AmbariException {
+ String serviceComponentName, String hostname) throws AmbariException {
loadServiceHostComponents();
clusterGlobalLock.readLock().lock();
try {
- readLock.lock();
- try {
- if (!serviceComponentHosts.containsKey(serviceName)
- || !serviceComponentHosts.get(serviceName)
- .containsKey(serviceComponentName)
- || !serviceComponentHosts.get(serviceName).get(serviceComponentName)
- .containsKey(hostname)) {
- throw new ServiceComponentHostNotFoundException(getClusterName(), serviceName,
- serviceComponentName, hostname);
- }
- return serviceComponentHosts.get(serviceName).get(serviceComponentName)
- .get(hostname);
- } finally {
- readLock.unlock();
- }
+ if (!serviceComponentHosts.containsKey(serviceName)
+ || !serviceComponentHosts.get(serviceName).containsKey(
+ serviceComponentName)
+ || !serviceComponentHosts.get(serviceName).get(serviceComponentName).containsKey(
+ hostname)) {
+ throw new ServiceComponentHostNotFoundException(getClusterName(),
+ serviceName, serviceComponentName, hostname);
+ }
+ return serviceComponentHosts.get(serviceName).get(serviceComponentName).get(
+ hostname);
} finally {
clusterGlobalLock.readLock().unlock();
}
-
}
@Override
public String getClusterName() {
clusterGlobalLock.readLock().lock();
try {
- readLock.lock();
- try {
- return clusterEntity.getClusterName();
- } finally {
- readLock.unlock();
- }
+ return clusterEntity.getClusterName();
} finally {
clusterGlobalLock.readLock().unlock();
}
-
}
@Override
public void setClusterName(String clusterName) {
- clusterGlobalLock.readLock().lock();
+ clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
- try {
- String oldName = clusterEntity.getClusterName();
- clusterEntity.setClusterName(clusterName);
- clusterDAO.merge(clusterEntity); //RollbackException possibility if UNIQUE constraint violated
- clusters.updateClusterName(oldName, clusterName);
- } finally {
- writeLock.unlock();
- }
+ String oldName = clusterEntity.getClusterName();
+ clusterEntity.setClusterName(clusterName);
+
+ // RollbackException possibility if UNIQUE constraint violated
+ clusterDAO.merge(clusterEntity);
+ clusters.updateClusterName(oldName, clusterName);
} finally {
- clusterGlobalLock.readLock().unlock();
+ clusterGlobalLock.writeLock().unlock();
}
-
}
public void addServiceComponentHost(
ServiceComponentHost svcCompHost) throws AmbariException {
loadServiceHostComponents();
clusterGlobalLock.writeLock().lock();
+
try {
- writeLock.lock();
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to add ServiceComponentHost to ClusterHostMap cache"
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to add ServiceComponentHost to ClusterHostMap cache"
+ ", serviceName=" + svcCompHost.getServiceName()
+ ", componentName=" + svcCompHost.getServiceComponentName()
+ ", hostname=" + svcCompHost.getHostName());
- }
+ }
- final String hostname = svcCompHost.getHostName();
- final String serviceName = svcCompHost.getServiceName();
- final String componentName = svcCompHost.getServiceComponentName();
- Set<Cluster> cs = clusters.getClustersForHost(hostname);
- boolean clusterFound = false;
- Iterator<Cluster> iter = cs.iterator();
- while (iter.hasNext()) {
- Cluster c = iter.next();
- if (c.getClusterId() == getClusterId()) {
- clusterFound = true;
- break;
- }
+ final String hostname = svcCompHost.getHostName();
+ final String serviceName = svcCompHost.getServiceName();
+ final String componentName = svcCompHost.getServiceComponentName();
+ Set<Cluster> cs = clusters.getClustersForHost(hostname);
+ boolean clusterFound = false;
+ Iterator<Cluster> iter = cs.iterator();
+ while (iter.hasNext()) {
+ Cluster c = iter.next();
+ if (c.getClusterId() == getClusterId()) {
+ clusterFound = true;
+ break;
}
- if (!clusterFound) {
- throw new AmbariException("Host does not belong this cluster"
- + ", hostname=" + hostname
- + ", clusterName=" + getClusterName()
+ }
+ if (!clusterFound) {
+ throw new AmbariException("Host does not belong this cluster"
+ + ", hostname=" + hostname + ", clusterName=" + getClusterName()
+ ", clusterId=" + getClusterId());
- }
+ }
- if (!serviceComponentHosts.containsKey(serviceName)) {
- serviceComponentHosts.put(serviceName,
+ if (!serviceComponentHosts.containsKey(serviceName)) {
+ serviceComponentHosts.put(serviceName,
new HashMap<String, Map<String, ServiceComponentHost>>());
- }
- if (!serviceComponentHosts.get(serviceName).containsKey(componentName)) {
- serviceComponentHosts.get(serviceName).put(componentName,
+ }
+ if (!serviceComponentHosts.get(serviceName).containsKey(componentName)) {
+ serviceComponentHosts.get(serviceName).put(componentName,
new HashMap<String, ServiceComponentHost>());
- }
+ }
- if (serviceComponentHosts.get(serviceName).get(componentName).
- containsKey(hostname)) {
- throw new AmbariException("Duplicate entry for ServiceComponentHost"
- + ", serviceName=" + serviceName
- + ", serviceComponentName" + componentName
- + ", hostname= " + hostname);
- }
+ if (serviceComponentHosts.get(serviceName).get(componentName).containsKey(
+ hostname)) {
+ throw new AmbariException("Duplicate entry for ServiceComponentHost"
+ + ", serviceName=" + serviceName + ", serviceComponentName"
+ + componentName + ", hostname= " + hostname);
+ }
- if (!serviceComponentHostsByHost.containsKey(hostname)) {
- serviceComponentHostsByHost.put(hostname,
+ if (!serviceComponentHostsByHost.containsKey(hostname)) {
+ serviceComponentHostsByHost.put(hostname,
new ArrayList<ServiceComponentHost>());
- }
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding a new ServiceComponentHost"
- + ", clusterName=" + getClusterName()
- + ", clusterId=" + getClusterId()
- + ", serviceName=" + serviceName
- + ", serviceComponentName" + componentName
- + ", hostname= " + hostname);
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding a new ServiceComponentHost" + ", clusterName="
+ + getClusterName() + ", clusterId=" + getClusterId()
+ + ", serviceName=" + serviceName + ", serviceComponentName"
+ + componentName + ", hostname= " + hostname);
+ }
- serviceComponentHosts.get(serviceName).get(componentName).put(hostname,
+ serviceComponentHosts.get(serviceName).get(componentName).put(hostname,
svcCompHost);
- serviceComponentHostsByHost.get(hostname).add(svcCompHost);
- } finally {
- writeLock.unlock();
- }
+ serviceComponentHostsByHost.get(hostname).add(svcCompHost);
} finally {
clusterGlobalLock.writeLock().unlock();
}
-
}
@Override
@@ -795,104 +700,85 @@ public class ClusterImpl implements Cluster {
loadServiceHostComponents();
clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to remove ServiceComponentHost to ClusterHostMap cache"
- + ", serviceName=" + svcCompHost.getServiceName()
- + ", componentName=" + svcCompHost.getServiceComponentName()
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to remove ServiceComponentHost to ClusterHostMap cache"
+ + ", serviceName="
+ + svcCompHost.getServiceName()
+ + ", componentName="
+ + svcCompHost.getServiceComponentName()
+ ", hostname=" + svcCompHost.getHostName());
- }
+ }
- final String hostname = svcCompHost.getHostName();
- final String serviceName = svcCompHost.getServiceName();
- final String componentName = svcCompHost.getServiceComponentName();
- Set<Cluster> cs = clusters.getClustersForHost(hostname);
- boolean clusterFound = false;
- Iterator<Cluster> iter = cs.iterator();
- while (iter.hasNext()) {
- Cluster c = iter.next();
- if (c.getClusterId() == getClusterId()) {
- clusterFound = true;
- break;
- }
+ final String hostname = svcCompHost.getHostName();
+ final String serviceName = svcCompHost.getServiceName();
+ final String componentName = svcCompHost.getServiceComponentName();
+ Set<Cluster> cs = clusters.getClustersForHost(hostname);
+ boolean clusterFound = false;
+ Iterator<Cluster> iter = cs.iterator();
+ while (iter.hasNext()) {
+ Cluster c = iter.next();
+ if (c.getClusterId() == getClusterId()) {
+ clusterFound = true;
+ break;
}
- if (!clusterFound) {
- throw new AmbariException("Host does not belong this cluster"
- + ", hostname=" + hostname
- + ", clusterName=" + getClusterName()
+ }
+
+ if (!clusterFound) {
+ throw new AmbariException("Host does not belong this cluster"
+ + ", hostname=" + hostname + ", clusterName=" + getClusterName()
+ ", clusterId=" + getClusterId());
- }
+ }
- if (!serviceComponentHosts.containsKey(serviceName)
+ if (!serviceComponentHosts.containsKey(serviceName)
|| !serviceComponentHosts.get(serviceName).containsKey(componentName)
- || !serviceComponentHosts.get(serviceName).get(componentName).
- containsKey(hostname)) {
- throw new AmbariException("Invalid entry for ServiceComponentHost"
- + ", serviceName=" + serviceName
- + ", serviceComponentName" + componentName
- + ", hostname= " + hostname);
- }
- if (!serviceComponentHostsByHost.containsKey(hostname)) {
- throw new AmbariException("Invalid host entry for ServiceComponentHost"
- + ", serviceName=" + serviceName
- + ", serviceComponentName" + componentName
- + ", hostname= " + hostname);
- }
+ || !serviceComponentHosts.get(serviceName).get(componentName).containsKey(
+ hostname)) {
+ throw new AmbariException("Invalid entry for ServiceComponentHost"
+ + ", serviceName=" + serviceName + ", serviceComponentName"
+ + componentName + ", hostname= " + hostname);
+ }
+ if (!serviceComponentHostsByHost.containsKey(hostname)) {
+ throw new AmbariException("Invalid host entry for ServiceComponentHost"
+ + ", serviceName=" + serviceName + ", serviceComponentName"
+ + componentName + ", hostname= " + hostname);
+ }
- ServiceComponentHost schToRemove = null;
- for (ServiceComponentHost sch : serviceComponentHostsByHost.get(hostname)) {
- if (sch.getServiceName().equals(serviceName)
+ ServiceComponentHost schToRemove = null;
+ for (ServiceComponentHost sch : serviceComponentHostsByHost.get(hostname)) {
+ if (sch.getServiceName().equals(serviceName)
&& sch.getServiceComponentName().equals(componentName)
&& sch.getHostName().equals(hostname)) {
- schToRemove = sch;
- break;
- }
- }
-
- if (schToRemove == null) {
- LOG.warn("Unavailable in per host cache. ServiceComponentHost"
- + ", serviceName=" + serviceName
- + ", serviceComponentName" + componentName
- + ", hostname= " + hostname);
+ schToRemove = sch;
+ break;
}
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Removing a ServiceComponentHost"
- + ", clusterName=" + getClusterName()
- + ", clusterId=" + getClusterId()
+ if (schToRemove == null) {
+ LOG.warn("Unavailable in per host cache. ServiceComponentHost"
+ ", serviceName=" + serviceName
+ ", serviceComponentName" + componentName
+ ", hostname= " + hostname);
- }
+ }
- serviceComponentHosts.get(serviceName).get(componentName).remove(hostname);
- if (schToRemove != null) {
- serviceComponentHostsByHost.get(hostname).remove(schToRemove);
- }
- } finally {
- writeLock.unlock();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing a ServiceComponentHost" + ", clusterName="
+ + getClusterName() + ", clusterId=" + getClusterId()
+ + ", serviceName=" + serviceName + ", serviceComponentName"
+ + componentName + ", hostname= " + hostname);
+ }
+
+ serviceComponentHosts.get(serviceName).get(componentName).remove(hostname);
+ if (schToRemove != null) {
+ serviceComponentHostsByHost.get(hostname).remove(schToRemove);
}
} finally {
clusterGlobalLock.writeLock().unlock();
}
-
}
@Override
public long getClusterId() {
- clusterGlobalLock.readLock().lock();
- try {
- readLock.lock();
- try {
- return clusterEntity.getClusterId();
- } finally {
- readLock.unlock();
- }
- } finally {
- clusterGlobalLock.readLock().unlock();
- }
-
+ return clusterEntity.getClusterId();
}
@Override
@@ -901,19 +787,14 @@ public class ClusterImpl implements Cluster {
loadServiceHostComponents();
clusterGlobalLock.readLock().lock();
try {
- readLock.lock();
- try {
- if (serviceComponentHostsByHost.containsKey(hostname)) {
- return new CopyOnWriteArrayList<ServiceComponentHost>(serviceComponentHostsByHost.get(hostname));
- }
- return new ArrayList<ServiceComponentHost>();
- } finally {
- readLock.unlock();
+ if (serviceComponentHostsByHost.containsKey(hostname)) {
+ return new CopyOnWriteArrayList<ServiceComponentHost>(
+ serviceComponentHostsByHost.get(hostname));
}
+ return new ArrayList<ServiceComponentHost>();
} finally {
clusterGlobalLock.readLock().unlock();
}
-
}
@Override
@@ -922,28 +803,20 @@ public class ClusterImpl implements Cluster {
loadServices();
clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding a new Service"
- + ", clusterName=" + getClusterName()
- + ", clusterId=" + getClusterId()
- + ", serviceName=" + service.getName());
- }
- if (services.containsKey(service.getName())) {
- throw new AmbariException("Service already exists"
- + ", clusterName=" + getClusterName()
- + ", clusterId=" + getClusterId()
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding a new Service" + ", clusterName=" + getClusterName()
+ + ", clusterId=" + getClusterId() + ", serviceName="
+ + service.getName());
+ }
+ if (services.containsKey(service.getName())) {
+ throw new AmbariException("Service already exists" + ", clusterName="
+ + getClusterName() + ", clusterId=" + getClusterId()
+ ", serviceName=" + service.getName());
- }
- services.put(service.getName(), service);
- } finally {
- writeLock.unlock();
}
+ services.put(service.getName(), service);
} finally {
clusterGlobalLock.writeLock().unlock();
}
-
}
@Override
@@ -951,30 +824,21 @@ public class ClusterImpl implements Cluster {
loadServices();
clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding a new Service"
- + ", clusterName=" + getClusterName()
- + ", clusterId=" + getClusterId()
- + ", serviceName=" + serviceName);
- }
- if (services.containsKey(serviceName)) {
- throw new AmbariException("Service already exists"
- + ", clusterName=" + getClusterName()
- + ", clusterId=" + getClusterId()
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding a new Service" + ", clusterName=" + getClusterName()
+ + ", clusterId=" + getClusterId() + ", serviceName=" + serviceName);
+ }
+ if (services.containsKey(serviceName)) {
+ throw new AmbariException("Service already exists" + ", clusterName="
+ + getClusterName() + ", clusterId=" + getClusterId()
+ ", serviceName=" + serviceName);
- }
- Service s = serviceFactory.createNew(this, serviceName);
- services.put(s.getName(), s);
- return s;
- } finally {
- writeLock.unlock();
}
+ Service s = serviceFactory.createNew(this, serviceName);
+ services.put(s.getName(), s);
+ return s;
} finally {
clusterGlobalLock.writeLock().unlock();
}
-
}
@Override
@@ -983,19 +847,13 @@ public class ClusterImpl implements Cluster {
loadServices();
clusterGlobalLock.readLock().lock();
try {
- readLock.lock();
- try {
- if (!services.containsKey(serviceName)) {
- throw new ServiceNotFoundException(getClusterName(), serviceName);
- }
- return services.get(serviceName);
- } finally {
- readLock.unlock();
+ if (!services.containsKey(serviceName)) {
+ throw new ServiceNotFoundException(getClusterName(), serviceName);
}
+ return services.get(serviceName);
} finally {
clusterGlobalLock.readLock().unlock();
}
-
}
@Override
@@ -1003,77 +861,53 @@ public class ClusterImpl implements Cluster {
loadServices();
clusterGlobalLock.readLock().lock();
try {
- readLock.lock();
- try {
- return new HashMap<String, Service>(services);
- } finally {
- readLock.unlock();
- }
+ return new HashMap<String, Service>(services);
} finally {
clusterGlobalLock.readLock().unlock();
}
-
}
@Override
public StackId getDesiredStackVersion() {
clusterGlobalLock.readLock().lock();
try {
- readLock.lock();
- try {
- return desiredStackVersion;
- } finally {
- readLock.unlock();
- }
+ return desiredStackVersion;
} finally {
clusterGlobalLock.readLock().unlock();
}
-
}
@Override
public void setDesiredStackVersion(StackId stackVersion) throws AmbariException {
- clusterGlobalLock.readLock().lock();
+ clusterGlobalLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Changing DesiredStackVersion of Cluster"
- + ", clusterName=" + getClusterName()
- + ", clusterId=" + getClusterId()
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Changing DesiredStackVersion of Cluster" + ", clusterName="
+ + getClusterName() + ", clusterId=" + getClusterId()
+ ", currentDesiredStackVersion=" + desiredStackVersion
+ ", newDesiredStackVersion=" + stackVersion);
- }
- desiredStackVersion = stackVersion;
- clusterEntity.setDesiredStackVersion(gson.toJson(stackVersion));
- clusterDAO.merge(clusterEntity);
- loadServiceConfigTypes();
- } finally {
- readWriteLock.writeLock().unlock();
}
+ desiredStackVersion = stackVersion;
+ clusterEntity.setDesiredStackVersion(gson.toJson(stackVersion));
+ clusterDAO.merge(clusterEntity);
+ loadServiceConfigTypes();
} finally {
- clusterGlobalLock.readLock().unlock();
+ clusterGlobalLock.writeLock().unlock();
}
-
}
@Override
public StackId getCurrentStackVersion() {
clusterGlobalLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- ClusterStateEntity clusterStateEntity = clusterEntity.getClusterStateEntity();
- if (clusterStateEntity != null) {
- String stackVersion = clusterStateEntity.getCurrentStackVersion();
- if (stackVersion != null && !stackVersion.isEmpty()) {
- return gson.fromJson(stackVersion, StackId.class);
- }
+ ClusterStateEntity clusterStateEntity = clusterEntity.getClusterStateEntity();
+ if (clusterStateEntity != null) {
+ String stackVersion = clusterStateEntity.getCurrentStackVersion();
+ if (stackVersion != null && !stackVersion.isEmpty()) {
+ return gson.fromJson(stackVersion, StackId.class);
}
- return null;
- } finally {
- readWriteLock.readLock().unlock();
}
+ return null;
} finally {
clusterGlobalLock.readLock().unlock();
}
@@ -1082,20 +916,15 @@ public class ClusterImpl implements Cluster {
@Override
public State getProvisioningState() {
clusterGlobalLock.readLock().lock();
+ State provisioningState = null;
try {
- readLock.lock();
- State provisioningState = null;
- try {
- provisioningState = clusterEntity.getProvisioningState();
-
- if( null == provisioningState ) {
- provisioningState = State.INIT;
- }
+ provisioningState = clusterEntity.getProvisioningState();
- return provisioningState;
- } finally {
- readLock.unlock();
+ if (null == provisioningState) {
+ provisioningState = State.INIT;
}
+
+ return provisioningState;
} finally {
clusterGlobalLock.readLock().unlock();
}
@@ -1103,37 +932,27 @@ public class ClusterImpl implements Cluster {
@Override
public void setProvisioningState(State provisioningState) {
- clusterGlobalLock.readLock().lock();
+ clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
- try {
- clusterEntity.setProvisioningState(provisioningState);
- clusterDAO.merge(clusterEntity);
- } finally {
- writeLock.unlock();
- }
+ clusterEntity.setProvisioningState(provisioningState);
+ clusterDAO.merge(clusterEntity);
} finally {
- clusterGlobalLock.readLock().unlock();
+ clusterGlobalLock.writeLock().unlock();
}
}
@Override
public SecurityType getSecurityType() {
clusterGlobalLock.readLock().lock();
+ SecurityType securityType = null;
try {
- readLock.lock();
- SecurityType securityType = null;
- try {
- securityType = clusterEntity.getSecurityType();
-
- if( null == securityType ) {
- securityType = SecurityType.NONE;
- }
+ securityType = clusterEntity.getSecurityType();
- return securityType;
- } finally {
- readLock.unlock();
+ if (null == securityType) {
+ securityType = SecurityType.NONE;
}
+
+ return securityType;
} finally {
clusterGlobalLock.readLock().unlock();
}
@@ -1141,17 +960,12 @@ public class ClusterImpl implements Cluster {
@Override
public void setSecurityType(SecurityType securityType) {
- clusterGlobalLock.readLock().lock();
+ clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
- try {
- clusterEntity.setSecurityType(securityType);
- clusterDAO.merge(clusterEntity);
- } finally {
- writeLock.unlock();
- }
+ clusterEntity.setSecurityType(securityType);
+ clusterDAO.merge(clusterEntity);
} finally {
- clusterGlobalLock.readLock().unlock();
+ clusterGlobalLock.writeLock().unlock();
}
}
@@ -1161,7 +975,7 @@ public class ClusterImpl implements Cluster {
*/
@Override
public ClusterVersionEntity getCurrentClusterVersion() {
- return clusterVersionDAO.findByClusterAndStateCurrent(this.getClusterName());
+ return clusterVersionDAO.findByClusterAndStateCurrent(getClusterName());
}
/**
@@ -1170,7 +984,7 @@ public class ClusterImpl implements Cluster {
*/
@Override
public Collection<ClusterVersionEntity> getAllClusterVersions() {
- return clusterVersionDAO.findByCluster(this.getClusterName());
+ return clusterVersionDAO.findByCluster(getClusterName());
}
/**
@@ -1183,7 +997,7 @@ public class ClusterImpl implements Cluster {
@Override
public void mapHostVersions(Set<String> hostNames, ClusterVersionEntity currentClusterVersion, RepositoryVersionState desiredState) throws AmbariException {
if (currentClusterVersion == null) {
- throw new AmbariException("Could not find current stack version of cluster " + this.getClusterName());
+ throw new AmbariException("Could not find current stack version of cluster " + getClusterName());
}
final Set<RepositoryVersionState> validStates = new HashSet<RepositoryVersionState>(){{
@@ -1194,25 +1008,28 @@ public class ClusterImpl implements Cluster {
throw new AmbariException("The state must be one of [" + StringUtils.join(validStates, ", ") + "]");
}
- clusterGlobalLock.readLock().lock();
+ clusterGlobalLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- Map<String, HostVersionEntity> existingHostToHostVersionEntity = new HashMap<String, HostVersionEntity>();
- List<HostVersionEntity> existingHostVersionEntities = hostVersionDAO.findByClusterStackAndVersion(this.getClusterName(),
- currentClusterVersion.getRepositoryVersion().getStack(), currentClusterVersion.getRepositoryVersion().getVersion());
+ Map<String, HostVersionEntity> existingHostToHostVersionEntity = new HashMap<String, HostVersionEntity>();
+ List<HostVersionEntity> existingHostVersionEntities = hostVersionDAO.findByClusterStackAndVersion(
+ getClusterName(),
+ currentClusterVersion.getRepositoryVersion().getStack(),
+ currentClusterVersion.getRepositoryVersion().getVersion());
- if (existingHostVersionEntities != null) {
- for (HostVersionEntity entity : existingHostVersionEntities) {
- existingHostToHostVersionEntity.put(entity.getHostName(), entity);
- }
+ if (existingHostVersionEntities != null) {
+ for (HostVersionEntity entity : existingHostVersionEntities) {
+ existingHostToHostVersionEntity.put(entity.getHostName(), entity);
}
+ }
- Sets.SetView<String> intersection = Sets.intersection(existingHostToHostVersionEntity.keySet(), hostNames);
+ Sets.SetView<String> intersection = Sets.intersection(
+ existingHostToHostVersionEntity.keySet(), hostNames);
- for (String hostname : hostNames) {
- List<HostVersionEntity> currentHostVersions = hostVersionDAO.findByClusterHostAndState(this.getClusterName(), hostname, RepositoryVersionState.CURRENT);
- HostVersionEntity currentHostVersionEntity = (currentHostVersions != null && currentHostVersions.size() == 1) ? currentHostVersions.get(0) : null;
+ for (String hostname : hostNames) {
+ List<HostVersionEntity> currentHostVersions = hostVersionDAO.findByClusterHostAndState(
+ getClusterName(), hostname, RepositoryVersionState.CURRENT);
+ HostVersionEntity currentHostVersionEntity = (currentHostVersions != null && currentHostVersions.size() == 1) ? currentHostVersions.get(0)
+ : null;
// Notice that if any hosts already have the desired stack and version, regardless of the state, we try
// to be robust and only insert records for the missing hosts.
@@ -1229,18 +1046,20 @@ public class ClusterImpl implements Cluster {
hostVersionDAO.merge(hostVersionEntity);
}
- // Maintain the invariant that only one HostVersionEntity is allowed to have a state of CURRENT.
- if (currentHostVersionEntity != null && !currentHostVersionEntity.getRepositoryVersion().equals(hostVersionEntity.getRepositoryVersion()) && desiredState == RepositoryVersionState.CURRENT && currentHostVersionEntity.getState() == RepositoryVersionState.CURRENT) {
- currentHostVersionEntity.setState(RepositoryVersionState.INSTALLED);
- hostVersionDAO.merge(currentHostVersionEntity);
- }
+ // Maintain the invariant that only one HostVersionEntity is allowed
+ // to have a state of CURRENT.
+ if (currentHostVersionEntity != null
+ && !currentHostVersionEntity.getRepositoryVersion().equals(
+ hostVersionEntity.getRepositoryVersion())
+ && desiredState == RepositoryVersionState.CURRENT
+ && currentHostVersionEntity.getState() == RepositoryVersionState.CURRENT) {
+ currentHostVersionEntity.setState(RepositoryVersionState.INSTALLED);
+ hostVersionDAO.merge(currentHostVersionEntity);
}
}
- } finally {
- readWriteLock.writeLock().unlock();
}
} finally {
- clusterGlobalLock.readLock().unlock();
+ clusterGlobalLock.writeLock().unlock();
}
}
@@ -1254,7 +1073,7 @@ public class ClusterImpl implements Cluster {
@Override
public void inferHostVersions(ClusterVersionEntity sourceClusterVersion) throws AmbariException {
if (sourceClusterVersion == null) {
- throw new AmbariException("Could not find current stack version of cluster " + this.getClusterName());
+ throw new AmbariException("Could not find current stack version of cluster " + getClusterName());
}
RepositoryVersionState desiredState = sourceClusterVersion.getState();
@@ -1267,44 +1086,44 @@ public class ClusterImpl implements Cluster {
throw new AmbariException("The state must be one of " + validStates);
}
- clusterGlobalLock.readLock().lock();
+ clusterGlobalLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- Set<String> existingHostsWithClusterStackAndVersion = new HashSet<String>();
- HashMap<String, HostVersionEntity> existingHostStackVersions = new HashMap<String, HostVersionEntity>();
- List<HostVersionEntity> existingHostVersionEntities = hostVersionDAO.findByClusterStackAndVersion(this.getClusterName(),
- sourceClusterVersion.getRepositoryVersion().getStack(), sourceClusterVersion.getRepositoryVersion().getVersion());
- if (existingHostVersionEntities != null) {
- for (HostVersionEntity entity : existingHostVersionEntities) {
- existingHostsWithClusterStackAndVersion.add(entity.getHostName());
- existingHostStackVersions.put(entity.getHostName(), entity);
- }
- }
-
- Map<String, Host> hosts = clusters.getHostsForCluster(this.getClusterName());
-
- Sets.SetView<String> hostsMissingRepoVersion = Sets.difference(hosts.keySet(), existingHostsWithClusterStackAndVersion);
-
- for (String hostname : hosts.keySet()) {
- if (hostsMissingRepoVersion.contains(hostname)) {
- // Create new host stack version
- HostEntity hostEntity = hostDAO.findByName(hostname);
- HostVersionEntity hostVersionEntity = new HostVersionEntity(hostname, sourceClusterVersion.getRepositoryVersion(), RepositoryVersionState.INSTALLING);
- hostVersionEntity.setHostEntity(hostEntity);
- hostVersionDAO.create(hostVersionEntity);
- } else {
- // Update existing host stack version
- HostVersionEntity hostVersionEntity = existingHostStackVersions.get(hostname);
- hostVersionEntity.setState(desiredState);
- hostVersionDAO.merge(hostVersionEntity);
- }
+ Set<String> existingHostsWithClusterStackAndVersion = new HashSet<String>();
+ HashMap<String, HostVersionEntity> existingHostStackVersions = new HashMap<String, HostVersionEntity>();
+ List<HostVersionEntity> existingHostVersionEntities = hostVersionDAO.findByClusterStackAndVersion(
+ getClusterName(),
+ sourceClusterVersion.getRepositoryVersion().getStack(),
+ sourceClusterVersion.getRepositoryVersion().getVersion());
+ if (existingHostVersionEntities != null) {
+ for (HostVersionEntity entity : existingHostVersionEntities) {
+ existingHostsWithClusterStackAndVersion.add(entity.getHostName());
+ existingHostStackVersions.put(entity.getHostName(), entity);
+ }
+ }
+
+ Map<String, Host> hosts = clusters.getHostsForCluster(getClusterName());
+
+ Sets.SetView<String> hostsMissingRepoVersion = Sets.difference(
+ hosts.keySet(), existingHostsWithClusterStackAndVersion);
+
+ for (String hostname : hosts.keySet()) {
+ if (hostsMissingRepoVersion.contains(hostname)) {
+ // Create new host stack version
+ HostEntity hostEntity = hostDAO.findByName(hostname);
+ HostVersionEntity hostVersionEntity = new HostVersionEntity(hostname,
+ sourceClusterVersion.getRepositoryVersion(),
+ RepositoryVersionState.INSTALLING);
+ hostVersionEntity.setHostEntity(hostEntity);
+ hostVersionDAO.create(hostVersionEntity);
+ } else {
+ // Update existing host stack version
+ HostVersionEntity hostVersionEntity = existingHostStackVersions.get(hostname);
+ hostVersionEntity.setState(desiredState);
+ hostVersionDAO.merge(hostVersionEntity);
}
- } finally {
- readWriteLock.writeLock().unlock();
}
} finally {
- clusterGlobalLock.readLock().unlock();
+ clusterGlobalLock.writeLock().unlock();
}
}
@@ -1368,102 +1187,117 @@ public class ClusterImpl implements Cluster {
return;
}
- clusterGlobalLock.readLock().lock();
+ clusterGlobalLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- // Part 1, bootstrap cluster version if necessary.
- StackId stackId = getCurrentStackVersion();
-
- ClusterVersionEntity clusterVersion = clusterVersionDAO.findByClusterAndStackAndVersion(this.getClusterName(),
- stackId.getStackId(), repositoryVersion);
-
- if (clusterVersion == null) {
- if (clusterVersionDAO.findByCluster(getClusterName()).isEmpty()) {
- // During an Ambari Upgrade from 1.7.0 -> 2.0.0, the Cluster Version will not exist, so bootstrap it.
- createClusterVersionInternal(stackId.getStackId(), repositoryVersion, AuthorizationHelper.getAuthenticatedName(configuration.getAnonymousAuditName()), RepositoryVersionState.UPGRADING);
- clusterVersion = clusterVersionDAO.findByClusterAndStackAndVersion(this.getClusterName(), stackId.getStackId(), repositoryVersion);
- } else {
- throw new AmbariException(String.format("Repository version %s not found for cluster %s", repositoryVersion, getClusterName()));
- }
- }
-
- // Ignore if cluster version is CURRENT or UPGRADE_FAILED
- if (clusterVersion.getState() != RepositoryVersionState.INSTALL_FAILED &&
- clusterVersion.getState() != RepositoryVersionState.OUT_OF_SYNC &&
- clusterVersion.getState() != RepositoryVersionState.INSTALLING &&
- clusterVersion.getState() != RepositoryVersionState.INSTALLED &&
- clusterVersion.getState() != RepositoryVersionState.UPGRADING &&
- clusterVersion.getState() != RepositoryVersionState.UPGRADED) {
- // anything else is not supported as of now
- return;
- }
-
- // Part 2, check for transitions.
- Map<String, Host> hosts = clusters.getHostsForCluster(this.getClusterName());
-
- Set<Host> hostsWithoutHostVersion = new HashSet<Host>();
- Map<RepositoryVersionState, Set<String>> stateToHosts = new HashMap<RepositoryVersionState, Set<String>>();
- for (Host host : hosts.values()) {
- String hostName = host.getHostName();
- HostVersionEntity hostVersion = hostVersionDAO.findByClusterStackVersionAndHost(this.getClusterName(), stackId.getStackId(), repositoryVersion, hostName);
- if (hostVersion == null) {
- // This host either has not had a chance to heartbeat yet with its installed component, or it has components
- // that do not advertise a version.
- hostsWithoutHostVersion.add(host);
- continue;
- }
-
- RepositoryVersionState hostState = hostVersion.getState();
- if (host.getState() != HostState.HEALTHY) {
- hostState = RepositoryVersionState.OUT_OF_SYNC;
- LOG.warn(String.format("Host %s is in unhealthy state, treating as %s", hostName, hostState));
- }
-
- if (stateToHosts.containsKey(hostState)) {
- stateToHosts.get(hostState).add(hostName);
- } else {
- Set<String> hostsInState = new HashSet<String>();
- hostsInState.add(hostName);
- stateToHosts.put(hostState, hostsInState);
- }
- }
-
- // Ensure that all of the hosts without a Host Version only have Components that do not advertise a version.
- // Otherwise, operations are still in progress.
- for (Host host : hostsWithoutHostVersion) {
- HostEntity hostEntity = hostDAO.findByName(host.getHostName());
- final Collection<HostComponentStateEntity> allHostComponents = hostEntity.getHostComponentStateEntities();
-
- for (HostComponentStateEntity hostComponentStateEntity: allHostComponents) {
- if (hostComponentStateEntity.getVersion().equalsIgnoreCase(State.UNKNOWN.toString())) {
- // Some Components cannot advertise a version. E.g., ZKF, AMS, Kerberos
- ComponentInfo compInfo = ambariMetaInfo.getComponent(
- stackId.getStackName(), stackId.getStackVersion(), hostComponentStateEntity.getServiceName(),
- hostComponentStateEntity.getComponentName());
-
- if (compInfo.isVersionAdvertised()) {
- LOG.debug("Skipping transitioning the cluster version because host " + host.getHostName() + " does not have a version yet.");
- return;
- }
+ // Part 1, bootstrap cluster version if necessary.
+ StackId stackId = getCurrentStackVersion();
+
+ ClusterVersionEntity clusterVersion = clusterVersionDAO.findByClusterAndStackAndVersion(
+ getClusterName(), stackId.getStackId(), repositoryVersion);
+
+ if (clusterVersion == null) {
+ if (clusterVersionDAO.findByCluster(getClusterName()).isEmpty()) {
+ // During an Ambari Upgrade from 1.7.0 -> 2.0.0, the Cluster Version
+ // will not exist, so bootstrap it.
+ createClusterVersionInternal(
+ stackId.getStackId(),
+ repositoryVersion,
+ AuthorizationHelper.getAuthenticatedName(configuration.getAnonymousAuditName()),
+ RepositoryVersionState.UPGRADING);
+ clusterVersion = clusterVersionDAO.findByClusterAndStackAndVersion(
+ getClusterName(), stackId.getStackId(), repositoryVersion);
+ } else {
+ throw new AmbariException(String.format(
+ "Repository version %s not found for cluster %s",
+ repositoryVersion, getClusterName()));
+ }
+ }
+
+ // Ignore if cluster version is CURRENT or UPGRADE_FAILED
+ if (clusterVersion.getState() != RepositoryVersionState.INSTALL_FAILED &&
+ clusterVersion.getState() != RepositoryVersionState.OUT_OF_SYNC &&
+ clusterVersion.getState() != RepositoryVersionState.INSTALLING &&
+ clusterVersion.getState() != RepositoryVersionState.INSTALLED &&
+ clusterVersion.getState() != RepositoryVersionState.UPGRADING &&
+ clusterVersion.getState() != RepositoryVersionState.UPGRADED) {
+ // anything else is not supported as of now
+ return;
+ }
+
+ // Part 2, check for transitions.
+ Map<String, Host> hosts = clusters.getHostsForCluster(getClusterName());
+
+ Set<Host> hostsWithoutHostVersion = new HashSet<Host>();
+ Map<RepositoryVersionState, Set<String>> stateToHosts = new HashMap<RepositoryVersionState, Set<String>>();
+ for (Host host : hosts.values()) {
+ String hostName = host.getHostName();
+ HostVersionEntity hostVersion = hostVersionDAO.findByClusterStackVersionAndHost(
+ getClusterName(), stackId.getStackId(), repositoryVersion, hostName);
+ if (hostVersion == null) {
+ // This host either has not had a chance to heartbeat yet with its
+ // installed component, or it has components
+ // that do not advertise a version.
+ hostsWithoutHostVersion.add(host);
+ continue;
+ }
+
+ RepositoryVersionState hostState = hostVersion.getState();
+ if (host.getState() != HostState.HEALTHY) {
+ hostState = RepositoryVersionState.OUT_OF_SYNC;
+ LOG.warn(String.format(
+ "Host %s is in unhealthy state, treating as %s", hostName,
+ hostState));
+ }
+
+ if (stateToHosts.containsKey(hostState)) {
+ stateToHosts.get(hostState).add(hostName);
+ } else {
+ Set<String> hostsInState = new HashSet<String>();
+ hostsInState.add(hostName);
+ stateToHosts.put(hostState, hostsInState);
+ }
+ }
+
+ // Ensure that all of the hosts without a Host Version only have
+ // Components that do not advertise a version.
+ // Otherwise, operations are still in progress.
+ for (Host host : hostsWithoutHostVersion) {
+ HostEntity hostEntity = hostDAO.findByName(host.getHostName());
+ final Collection<HostComponentStateEntity> allHostComponents = hostEntity.getHostComponentStateEntities();
+
+ for (HostComponentStateEntity hostComponentStateEntity : allHostComponents) {
+ if (hostComponentStateEntity.getVersion().equalsIgnoreCase(
+ State.UNKNOWN.toString())) {
+ // Some Components cannot advertise a version. E.g., ZKF, AMS,
+ // Kerberos
+ ComponentInfo compInfo = ambariMetaInfo.getComponent(
+ stackId.getStackName(), stackId.getStackVersion(),
+ hostComponentStateEntity.getServiceName(),
+ hostComponentStateEntity.getComponentName());
+
+ if (compInfo.isVersionAdvertised()) {
+ LOG.debug("Skipping transitioning the cluster version because host "
+ + host.getHostName() + " does not have a version yet.");
+ return;
}
}
}
-
- RepositoryVersionState effectiveClusterVersionState = getEffectiveState(stateToHosts);
- if (effectiveClusterVersionState != null && effectiveClusterVersionState != clusterVersion.getState()) {
- // Any mismatch will be caught while transitioning, and raise an exception.
- try {
- transitionClusterVersion(stackId.getStackId(), repositoryVersion, effectiveClusterVersionState);
- } catch (AmbariException e) {
- ;
- }
+ }
+
+ RepositoryVersionState effectiveClusterVersionState = getEffectiveState(stateToHosts);
+ if (effectiveClusterVersionState != null
+ && effectiveClusterVersionState != clusterVersion.getState()) {
+ // Any mismatch will be caught while transitioning, and raise an
+ // exception.
+ try {
+ transitionClusterVersion(stackId.getStackId(), repositoryVersion,
+ effectiveClusterVersionState);
+ } catch (AmbariException e) {
+ ;
}
- } finally {
- readWriteLock.writeLock().unlock();
}
} finally {
- clusterGlobalLock.readLock().unlock();
+ clusterGlobalLock.writeLock().unlock();
}
}
@@ -1479,6 +1313,7 @@ public class ClusterImpl implements Cluster {
* @param stack
* @throws AmbariException
*/
+ @Override
@Transactional
public HostVersionEntity transitionHostVersionState(HostEntity host, final RepositoryVersionEntity repositoryVersion, final StackId stack) throws AmbariException {
HostVersionEntity hostVersionEntity = null;
@@ -1489,7 +1324,7 @@ public class ClusterImpl implements Cluster {
hostVersionEntity.setHostEntity(host);
hostVersionDAO.create(hostVersionEntity);
} else {
- hostVersionEntity = hostVersionDAO.findByClusterStackVersionAndHost(this.getClusterName(), repositoryVersion.getStack(), repositoryVersion.getVersion(), host.getHostName());
+ hostVersionEntity = hostVersionDAO.findByClusterStackVersionAndHost(getClusterName(), repositoryVersion.getStack(), repositoryVersion.getVersion(), host.getHostName());
if (hostVersionEntity == null) {
throw new AmbariException("Host " + host.getHostName() + " is expected to have a Host Version for stack " + repositoryVersion.getStackVersion());
}
@@ -1517,7 +1352,7 @@ public class ClusterImpl implements Cluster {
// If 0 or 1 cluster version exists, then a brand new cluster permits the host to transition from UPGRADING->CURRENT
// If multiple cluster versions exist, then it means that the change in versions is happening due to an Upgrade,
// so should only allow transitioning to UPGRADED or UPGRADING, depending on further circumstances.
- List<ClusterVersionEntity> clusterVersions = clusterVersionDAO.findByCluster(this.getClusterName());
+ List<ClusterVersionEntity> clusterVersions = clusterVersionDAO.findByCluster(getClusterName());
final int versionedPlusNoVersionNeededSize = versionedHostComponents.size() + noVersionNeededComponents.size();
if (clusterVersions.size() <= 1) {
// Transition from UPGRADING -> CURRENT. This is allowed because Host Version Entity is bootstrapped in an UPGRADING state.
@@ -1541,7 +1376,7 @@ public class ClusterImpl implements Cluster {
} else{
// HostVersion is INSTALLED and an upgrade is in-progress because at least 2 components have different versions,
// Or the host has no components that advertise a version, so still consider it as UPGRADING.
- if (hostVersionEntity.getState().equals(RepositoryVersionState.INSTALLED) && versionedHostComponents.size() > 0 &&
+ if (hostVersionEntity.getState().equals(RepositoryVersionState.INSTALLED) && versionedHostComponents.size() > 0 &&
!ServiceComponentHostImpl.haveSameVersion(versionedHostComponents)) {
hostVersionEntity.setState(RepositoryVersionState.UPGRADING);
hostVersionDAO.merge(hostVersionEntity);
@@ -1554,38 +1389,29 @@ public class ClusterImpl implements Cluster {
@Override
public void recalculateAllClusterVersionStates() throws AmbariException {
- clusterGlobalLock.readLock().lock();
+ clusterGlobalLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- List<ClusterVersionEntity> clusterVersionEntities = clusterVersionDAO.findByCluster(getClusterName());
- StackId currentStackId = getCurrentStackVersion();
- for (ClusterVersionEntity clusterVersionEntity : clusterVersionEntities) {
- if (clusterVersionEntity.getRepositoryVersion().getStack().equals(currentStackId.getStackId())
+ List<ClusterVersionEntity> clusterVersionEntities = clusterVersionDAO.findByCluster(getClusterName());
+ StackId currentStackId = getCurrentStackVersion();
+ for (ClusterVersionEntity clusterVersionEntity : clusterVersionEntities) {
+ if (clusterVersionEntity.getRepositoryVersion().getStack().equals(
+ currentStackId.getStackId())
&& clusterVersionEntity.getState() != RepositoryVersionState.CURRENT) {
- recalculateClusterVersionState(clusterVersionEntity.getRepositoryVersion().getVersion());
- }
+ recalculateClusterVersionState(clusterVersionEntity.getRepositoryVersion().getVersion());
}
- } finally {
- readWriteLock.writeLock().unlock();
}
} finally {
- clusterGlobalLock.readLock().unlock();
+ clusterGlobalLock.writeLock().unlock();
}
}
@Override
public void createClusterVersion(String stack, String version, String userName, RepositoryVersionState state) throws AmbariException {
- clusterGlobalLock.readLock().lock();
+ clusterGlobalLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- createClusterVersionInternal(stack, version, userName, state);
- } finally {
- readWriteLock.writeLock().unlock();
- }
+ createClusterVersionInternal(stack, version, userName, state);
} finally {
- clusterGlobalLock.readLock().unlock();
+ clusterGlobalLock.writeLock().unlock();
}
}
@@ -1607,10 +1433,10 @@ public class ClusterImpl implements Cluster {
throw new AmbariException("The allowed state for a new cluster version must be within " + allowedStates);
}
- ClusterVersionEntity existing = clusterVersionDAO.findByClusterAndStackAndVersion(this.getClusterName(), stack, version);
+ ClusterVersionEntity existing = clusterVersionDAO.findByClusterAndStackAndVersion(getClusterName(), stack, version);
if (existing != null) {
throw new DuplicateResourceException("Duplicate item, a cluster version with stack=" + stack + ", version=" +
- version + " for cluster " + this.getClusterName() + " already exists");
+ version + " for cluster " + getClusterName() + " already exists");
}
RepositoryVersionEntity repositoryVersionEntity = repositoryVersionDAO.findByStackAndVersion(stack, version);
@@ -1618,7 +1444,7 @@ public class ClusterImpl implements Cluster {
throw new AmbariException("Could not find repository version for stack=" + stack + ", version=" + version );
}
- ClusterVersionEntity clusterVersionEntity = new ClusterVersionEntity(this.clusterEntity, repositoryVersionEntity, state, System.currentTimeMillis(), System.currentTimeMillis(), userName);
+ ClusterVersionEntity clusterVersionEntity = new ClusterVersionEntity(clusterEntity, repositoryVersionEntity, state, System.currentTimeMillis(), System.currentTimeMillis(), userName);
clusterVersionDAO.create(clusterVersionEntity);
}
@@ -1633,202 +1459,175 @@ public class ClusterImpl implements Cluster {
@Transactional
public void transitionClusterVersion(String stack, String version, RepositoryVersionState state) throws AmbariException {
Set<RepositoryVersionState> allowedStates = new HashSet<RepositoryVersionState>();
- clusterGlobalLock.readLock().lock();
+ clusterGlobalLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- ClusterVersionEntity existingClusterVersion = clusterVersionDAO.findByClusterAndStackAndVersion(this.getClusterName(), stack, version);
- if (existingClusterVersion == null) {
- throw new AmbariException("Existing cluster version not found for cluster=" + this.getClusterName() + ", stack=" + stack + ", version=" + version);
- }
-
- if (existingClusterVersion.getState() != state) {
- switch (existingClusterVersion.getState()) {
- case CURRENT:
- // If CURRENT state is changed here cluster will not have CURRENT state.
- // CURRENT state will be changed to INSTALLED when another CURRENT state is added.
- // allowedStates.add(RepositoryVersionState.INSTALLED);
- break;
- case INSTALLING:
- allowedStates.add(RepositoryVersionState.INSTALLED);
- allowedStates.add(RepositoryVersionState.INSTALL_FAILED);
- allowedStates.add(RepositoryVersionState.OUT_OF_SYNC);
- break;
- case INSTALL_FAILED:
- allowedStates.add(RepositoryVersionState.INSTALLING);
- break;
- case INSTALLED:
- allowedStates.add(RepositoryVersionState.INSTALLING);
- allowedStates.add(RepositoryVersionState.UPGRADING);
- allowedStates.add(RepositoryVersionState.OUT_OF_SYNC);
- break;
- case OUT_OF_SYNC:
- allowedStates.add(RepositoryVersionState.INSTALLING);
- break;
- case UPGRADING:
- allowedStates.add(RepositoryVersionState.UPGRADED);
- allowedStates.add(RepositoryVersionState.UPGRADE_FAILED);
- if (clusterVersionDAO.findByClusterAndStateCurrent(getClusterName()) == null) {
- allowedStates.add(RepositoryVersionState.CURRENT);
- }
- break;
- case UPGRADED:
+ ClusterVersionEntity existingClusterVersion = clusterVersionDAO.findByClusterAndStackAndVersion(
+ getClusterName(), stack, version);
+ if (existingClusterVersion == null) {
+ throw new AmbariException(
+ "Existing cluster version not found for cluster="
+ + getClusterName() + ", stack=" + stack + ", version="
+ + version);
+ }
+
+ if (existingClusterVersion.getState() != state) {
+ switch (existingClusterVersion.getState()) {
+ case CURRENT:
+ // If CURRENT state is changed here cluster will not have CURRENT
+ // state.
+ // CURRENT state will be changed to INSTALLED when another CURRENT
+ // state is added.
+ // allowedStates.add(RepositoryVersionState.INSTALLED);
+ break;
+ case INSTALLING:
+ allowedStates.add(RepositoryVersionState.INSTALLED);
+ allowedStates.add(RepositoryVersionState.INSTALL_FAILED);
+ allowedStates.add(RepositoryVersionState.OUT_OF_SYNC);
+ break;
+ case INSTALL_FAILED:
+ allowedStates.add(RepositoryVersionState.INSTALLING);
+ break;
+ case INSTALLED:
+ allowedStates.add(RepositoryVersionState.INSTALLING);
+ allowedStates.add(RepositoryVersionState.UPGRADING);
+ allowedStates.add(RepositoryVersionState.OUT_OF_SYNC);
+ break;
+ case OUT_OF_SYNC:
+ allowedStates.add(RepositoryVersionState.INSTALLING);
+ break;
+ case UPGRADING:
+ allowedStates.add(RepositoryVersionState.UPGRADED);
+ allowedStates.add(RepositoryVersionState.UPGRADE_FAILED);
+ if (clusterVersionDAO.findByClusterAndStateCurrent(getClusterName()) == null) {
allowedStates.add(RepositoryVersionState.CURRENT);
- break;
- case UPGRADE_FAILED:
- allowedStates.add(RepositoryVersionState.UPGRADING);
- break;
- }
+ }
+ break;
+ case UPGRADED:
+ allowedStates.add(RepositoryVersionState.CURRENT);
+ break;
+ case UPGRADE_FAILED:
+ allowedStates.add(RepositoryVersionState.UPGRADING);
+ break;
+ }
- if (!allowedStates.contains(state)) {
- throw new AmbariException("Invalid cluster version transition from " + existingClusterVersion.getState() + " to " + state);
- }
+ if (!allowedStates.contains(state)) {
+ throw new AmbariException("Invalid cluster version transition from "
+ + existingClusterVersion.getState() + " to " + state);
+ }
- // There must be at most one cluster version whose state is CURRENT at all times.
- if (state == RepositoryVersionState.CURRENT) {
- ClusterVersionEntity currentVersion = clusterVersionDAO.findByClusterAndStateCurrent(this.getClusterName());
- if (currentVersion != null) {
- currentVersion.setState(RepositoryVersionState.INSTALLED);
- clusterVersionDAO.merge(currentVersion);
- }
+ // There must be at most one cluster version whose state is CURRENT at
+ // all times.
+ if (state == RepositoryVersionState.CURRENT) {
+ ClusterVersionEntity currentVersion = clusterVersionDAO.findByClusterAndStateCurrent(getClusterName());
+ if (currentVersion != null) {
+ currentVersion.setState(RepositoryVersionState.INSTALLED);
+ clusterVersionDAO.merge(currentVersion);
}
-
- existingClusterVersion.setState(state);
- existingClusterVersion.setEndTime(System.currentTimeMillis());
- clusterVersionDAO.merge(existingClusterVersion);
}
- } catch (RollbackException e) {
- String message = "Unable to transition stack " + stack + " at version " + version + " for cluster " + getClusterName() + " to state " + state;
- LOG.warn(message);
- throw new AmbariException(message, e);
- } finally {
- readWriteLock.writeLock().unlock();
+
+ existingClusterVersion.setState(state);
+ existingClusterVersion.setEndTime(System.currentTimeMillis());
+ clusterVersionDAO.merge(existingClusterVersion);
}
+ } catch (RollbackException e) {
+ String message = "Unable to transition stack " + stack + " at version "
+ + version + " for cluster " + getClusterName() + " to state " + state;
+ LOG.warn(message);
+ throw new AmbariException(message, e);
} finally {
- clusterGlobalLock.readLock().unlock();
+ clusterGlobalLock.writeLock().unlock();
}
}
@Override
public void setCurrentStackVersion(StackId stackVersion)
throws AmbariException {
- clusterGlobalLock.readLock().lock();
+ clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
- try {
- ClusterStateEntity clusterStateEntity = clusterStateDAO.findByPK(clusterEntity.getClusterId());
- if (clusterStateEntity == null) {
- clusterStateEntity = new ClusterStateEntity();
- clusterStateEntity.setClusterId(clusterEntity.getClusterId());
- clusterStateEntity.setCurrentStackVersion(gson.toJson(stackVersion));
- clusterStateEntity.setClusterEntity(clusterEntity);
- clusterStateDAO.create(clusterStateEntity);
- clusterStateEntity = clusterStateDAO.merge(clusterStateEntity);
- clusterEntity.setClusterStateEntity(clusterStateEntity);
- clusterEntity = clusterDAO.merge(clusterEntity);
- } else {
- clusterStateEntity.setCurrentStackVersion(gson.toJson(stackVersion));
- clusterStateDAO.merge(clusterStateEntity);
- clusterEntity = clusterDAO.merge(clusterEntity);
- }
- } catch (RollbackException e) {
- LOG.warn("Unable to set version " + stackVersion + " for cluster " + getClusterName());
- throw new AmbariException("Unable to set"
- + " version=" + stackVersion
+ ClusterStateEntity clusterStateEntity = clusterStateDAO.findByPK(clusterEntity.getClusterId());
+ if (clusterStateEntity == null) {
+ clusterStateEntity = new ClusterStateEntity();
+ clusterStateEntity.setClusterId(clusterEntity.getClusterId());
+ clusterStateEntity.setCurrentStackVersion(gson.toJson(stackVersion));
+ clusterStateEntity.setClusterEntity(clusterEntity);
+ clusterStateDAO.create(clusterStateEntity);
+ clusterStateEntity = clusterStateDAO.merge(clusterStateEntity);
+ clusterEntity.setClusterStateEntity(clusterStateEntity);
+ clusterEntity = clusterDAO.merge(clusterEntity);
+ } else {
+ clusterStateEntity.setCurrentStackVersion(gson.toJson(stackVersion));
+ clusterStateDAO.merge(clusterStateEntity);
+ clusterEntity = clusterDAO.merge(clusterEntity);
+ }
+ } catch (RollbackException e) {
+ LOG.warn("Unable to set version " + stackVersion + " for cluster "
+ + getClusterName());
+ throw new AmbariException("Unable to set" + " version=" + stackVersion
+ " for cluster " + getClusterName(), e);
- } finally {
- writeLock.unlock();
- }
} finally {
- clusterGlobalLock.readLock().unlock();
+ clusterGlobalLock.writeLock().unlock();
}
-
}
@Override
public Map<String, Config> getConfigsByType(String configType) {
clusterGlobalLock.readLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- if (!allConfigs.containsKey(configType)) {
- return null;
- }
-
- return Collections.unmodifiableMap(allConfigs.get(configType));
- } finally {
- readWriteLock.writeLock().unlock();
+ if (!allConfigs.containsKey(configType)) {
+ return null;
}
+
+ return Collections.unmodifiableMap(allConfigs.get(configType));
} finally {
clusterGlobalLock.readLock().unlock();
}
-
}
@Override
public Config getConfig(String configType, String versionTag) {
clusterGlobalLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- if (!allConfigs.containsKey(configType)
+ if (!allConfigs.containsKey(configType)
|| !allConfigs.get(configType).containsKey(versionTag)) {
- return null;
- }
- return allConfigs.get(configType).get(versionTag);
- } finally {
- readWriteLock.readLock().unlock();
+ return null;
}
+ return allConfigs.get(configType).get(versionTag);
} finally {
clusterGlobalLock.readLock().unlock();
}
-
}
@Override
public void addConfig(Config config) {
- clusterGlobalLock.readLock().lock();
+ clusterGlobalLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- if (config.getType() == null
- || config.getType().isEmpty()) {
- throw new IllegalArgumentException("Config type cannot be empty");
- }
- if (!allConfigs.containsKey(config.getType())) {
- allConfigs.put(config.getType(), new HashMap<String, Config>());
- }
-
- allConfigs.get(config.getType()).put(config.getTag(), config);
- } finally {
- readWriteLock.writeLock().unlock();
+ if (config.getType() == null || config.getType().isEmpty()) {
+ throw new IllegalArgumentException("Config type cannot be empty");
}
+ if (!allConfigs.containsKey(config.getType())) {
+ allConfigs.put(config.getType(), new HashMap<String, Config>());
+ }
+
+ allConfigs.get(config.getType()).put(config.getTag(), config);
} finally {
- clusterGlobalLock.readLock().unlock();
+ clusterGlobalLock.writeLock().unlock();
}
-
}
@Override
public Collection<Config> getAllConfigs() {
clusterGlobalLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- List<Config> list = new ArrayList<Config>();
- for (Entry<String, Map<String, Config>> entry : allConfigs.entrySet()) {
- for (Config config : entry.getValue().values()) {
- list.add(config);
- }
+ List<Config> list = new ArrayList<Config>();
+ for (Entry<String, Map<String, Config>> entry : allConfigs.entrySet()) {
+ for (Config config : entry.getValue().values()) {
+ list.add(config);
}
- return Collections.unmodifiableList(list);
- } finally {
- readWriteLock.readLock().unlock();
}
+ return Collections.unmodifiableList(list);
} finally {
clusterGlobalLock.readLock().unlock();
}
-
}
@Override
@@ -1836,20 +1635,15 @@ public class ClusterImpl implements Cluster {
throws AmbariException {
clusterGlobalLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- Map<String, Host> hosts = clusters.getHostsForCluster(getClusterName());
+ Map<String, Host> hosts = clusters.getHostsForCluster(getClusterName());
- return new ClusterResponse(getClusterId(),
- getClusterName(), getProvisioningState(), getSecurityType(), hosts.keySet(), hosts.size(),
- getDesiredStackVersion().getStackId(), getClusterHealthReport());
- } finally {
- readWriteLock.readLock().unlock();
- }
+ return new ClusterResponse(getClusterId(), getClusterName(),
+ getProvisioningState(), getSecurityType(), hosts.keySet(),
+ hosts.size(), getDesiredStackVersion().getStackId(),
+ getClusterHealthReport());
} finally {
clusterGlobalLock.readLock().unlock();
}
-
}
@Override
@@ -1857,48 +1651,36 @@ public class ClusterImpl implements Cluster {
loadServices();
clusterGlobalLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- sb.append("Cluster={ clusterName=").append(getClusterName())
- .append(", clusterId=").append(getClusterId())
- .append(", desiredStackVersion=").append(desiredStackVersion.getStackId())
- .append(", services=[ ");
- boolean first = true;
- for (Service s : services.values()) {
- if (!first) {
- sb.append(" , ");
- }
- first = false;
- sb.append("\n ");
- s.debugDump(sb);
- sb.append(' ');
- }
- sb.append(" ] }");
- } finally {
- readWriteLock.readLock().unlock();
- }
+ sb.append("Cluster={ clusterName=").append(getClusterName()).append(
+ ", clusterId=").append(getClusterId()).append(
+ ", desiredStackVersion=").append(desiredStackVersion.getStackId()).append(
+ ", services=[ ");
+ boolean first = true;
+ for (Service s : services.values()) {
+ if (!first) {
+ sb.append(" , ");
+ }
+ first = false;
+ sb.append("\n ");
+ s.debugDump(sb);
+ sb.append(' ');
+ }
+ sb.append(" ] }");
} finally {
clusterGlobalLock.readLock().unlock();
}
-
}
@Override
@Transactional
public void refresh() {
- clusterGlobalLock.readLock().lock();
+ clusterGlobalLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- clusterEntity = clusterDAO.findById(clusterEntity.getClusterId());
- clusterDAO.refresh(clusterEntity);
- } finally {
- readWriteLock.writeLock().unlock();
- }
+ clusterEntity = clusterDAO.findById(clusterEntity.getClusterId());
+ clusterDAO.refresh(clusterEntity);
} finally {
- clusterGlobalLock.readLock().unlock();
+ clusterGlobalLock.writeLock().unlock();
}
-
}
@Override
@@ -1907,31 +1689,25 @@ public class ClusterImpl implements Cluster {
loadServices();
clusterGlobalLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- LOG.info("Deleting all services for cluster"
- + ", clusterName=" + getClusterName());
- for (Service service : services.values()) {
- if (!service.canBeRemoved()) {
- throw new AmbariException("Found non removable service when trying to"
- + " all services from cluster"
- + ", clusterName=" + getClusterName()
- + ", serviceName=" + service.getName());
- }
- }
-
- for (Service service : services.values()) {
- service.delete();
+ LOG.info("Deleting all services for cluster" + ", clusterName="
+ + getClusterName());
+ for (Service service : services.values()) {
+ if (!service.canBeRemoved()) {
+ throw new AmbariException(
+ "Found non removable service when trying to"
+ + " all services from cluster" + ", clusterName="
+ + getClusterName() + ", serviceName=" + service.getName());
}
+ }
- services.clear();
- } finally {
- readWriteLock.writeLock().unlock();
+ for (Service service : services.values()
<TRUNCATED>