You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2015/03/05 19:25:06 UTC
[1/3] ambari git commit: AMBARI-9368 - Deadlock Between Dependent
Cluster/Service/Component/Host Implementations (jonathanhurley)
Repository: ambari
Updated Branches:
refs/heads/branch-1.7.0 7566e570a -> dd572d354
http://git-wip-us.apache.org/repos/asf/ambari/blob/dd572d35/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
index ade5792..15c16b0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
@@ -131,12 +131,12 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
State.INSTALLED,
ServiceComponentHostEventType.HOST_SVCCOMP_OP_SUCCEEDED,
new ServiceComponentHostOpCompletedTransition())
-
+
.addTransition(State.INSTALLED,
State.INSTALLED,
ServiceComponentHostEventType.HOST_SVCCOMP_OP_SUCCEEDED,
new ServiceComponentHostOpCompletedTransition())
-
+
.addTransition(State.INSTALLING,
State.INSTALLING,
ServiceComponentHostEventType.HOST_SVCCOMP_OP_IN_PROGRESS,
@@ -202,7 +202,7 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
State.STARTING,
ServiceComponentHostEventType.HOST_SVCCOMP_OP_IN_PROGRESS,
new ServiceComponentHostOpInProgressTransition())
-
+
.addTransition(State.STARTING,
State.STARTING,
ServiceComponentHostEventType.HOST_SVCCOMP_START,
@@ -211,7 +211,7 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
State.STARTED,
ServiceComponentHostEventType.HOST_SVCCOMP_STARTED,
new ServiceComponentHostOpCompletedTransition())
-
+
.addTransition(State.STARTING,
State.INSTALLED,
ServiceComponentHostEventType.HOST_SVCCOMP_OP_FAILED,
@@ -383,7 +383,7 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
State.INSTALLING,
ServiceComponentHostEventType.HOST_SVCCOMP_INSTALL,
new ServiceComponentHostOpStartedTransition())
-
+
.addTransition(State.INSTALLING,
State.INSTALLING,
ServiceComponentHostEventType.HOST_SVCCOMP_OP_IN_PROGRESS,
@@ -557,56 +557,44 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
private void resetLastOpInfo() {
- clusterGlobalLock.readLock().lock();
+ writeLock.lock();
try {
- try {
- writeLock.lock();
- setLastOpStartTime(-1);
- setLastOpLastUpdateTime(-1);
- setLastOpEndTime(-1);
- } finally {
- writeLock.unlock();
- }
+ setLastOpStartTime(-1);
+ setLastOpLastUpdateTime(-1);
+ setLastOpEndTime(-1);
} finally {
- clusterGlobalLock.readLock().unlock();
+ writeLock.unlock();
}
-
}
private void updateLastOpInfo(ServiceComponentHostEventType eventType,
long time) {
- clusterGlobalLock.readLock().lock();
+ writeLock.lock();
try {
- try {
- writeLock.lock();
- switch (eventType) {
- case HOST_SVCCOMP_INSTALL:
- case HOST_SVCCOMP_START:
- case HOST_SVCCOMP_STOP:
- case HOST_SVCCOMP_UNINSTALL:
- case HOST_SVCCOMP_WIPEOUT:
- case HOST_SVCCOMP_OP_RESTART:
- resetLastOpInfo();
- setLastOpStartTime(time);
- break;
- case HOST_SVCCOMP_OP_FAILED:
- case HOST_SVCCOMP_OP_SUCCEEDED:
- case HOST_SVCCOMP_STOPPED:
- case HOST_SVCCOMP_STARTED:
- setLastOpLastUpdateTime(time);
- setLastOpEndTime(time);
- break;
- case HOST_SVCCOMP_OP_IN_PROGRESS:
- setLastOpLastUpdateTime(time);
- break;
- }
- } finally {
- writeLock.unlock();
+ switch (eventType) {
+ case HOST_SVCCOMP_INSTALL:
+ case HOST_SVCCOMP_START:
+ case HOST_SVCCOMP_STOP:
+ case HOST_SVCCOMP_UNINSTALL:
+ case HOST_SVCCOMP_WIPEOUT:
+ case HOST_SVCCOMP_OP_RESTART:
+ resetLastOpInfo();
+ setLastOpStartTime(time);
+ break;
+ case HOST_SVCCOMP_OP_FAILED:
+ case HOST_SVCCOMP_OP_SUCCEEDED:
+ case HOST_SVCCOMP_STOPPED:
+ case HOST_SVCCOMP_STARTED:
+ setLastOpLastUpdateTime(time);
+ setLastOpEndTime(time);
+ break;
+ case HOST_SVCCOMP_OP_IN_PROGRESS:
+ setLastOpLastUpdateTime(time);
+ break;
}
} finally {
- clusterGlobalLock.readLock().unlock();
+ writeLock.unlock();
}
-
}
@AssistedInject
@@ -615,13 +603,13 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
injector.injectMembers(this);
if (serviceComponent.isClientComponent()) {
- this.stateMachine = clientStateMachineFactory.make(this);
+ stateMachine = clientStateMachineFactory.make(this);
} else {
- this.stateMachine = daemonStateMachineFactory.make(this);
+ stateMachine = daemonStateMachineFactory.make(this);
}
this.serviceComponent = serviceComponent;
- this.clusterGlobalLock = serviceComponent.getClusterGlobalLock();
+ clusterGlobalLock = serviceComponent.getClusterGlobalLock();
stateEntity = new HostComponentStateEntity();
stateEntity.setClusterId(serviceComponent.getClusterId());
@@ -646,14 +634,14 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
}
try {
- this.host = clusters.getHost(hostName);
+ host = clusters.getHost(hostName);
} catch (AmbariException e) {
//TODO exception?
LOG.error("Host '{}' was not found" + hostName);
throw new RuntimeException(e);
}
- this.resetLastOpInfo();
+ resetLastOpInfo();
}
@AssistedInject
@@ -663,21 +651,21 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
Injector injector) {
injector.injectMembers(this);
this.serviceComponent = serviceComponent;
- this.clusterGlobalLock = serviceComponent.getClusterGlobalLock();
+ clusterGlobalLock = serviceComponent.getClusterGlobalLock();
this.desiredStateEntity = desiredStateEntity;
this.stateEntity = stateEntity;
//TODO implement State Machine init as now type choosing is hardcoded in above code
if (serviceComponent.isClientComponent()) {
- this.stateMachine = clientStateMachineFactory.make(this);
+ stateMachine = clientStateMachineFactory.make(this);
} else {
- this.stateMachine = daemonStateMachineFactory.make(this);
+ stateMachine = daemonStateMachineFactory.make(this);
}
- this.stateMachine.setCurrentState(stateEntity.getCurrentState());
+ stateMachine.setCurrentState(stateEntity.getCurrentState());
try {
- this.host = clusters.getHost(stateEntity.getHostName());
+ host = clusters.getHost(stateEntity.getHostName());
} catch (AmbariException e) {
//TODO exception? impossible due to database restrictions
LOG.error("Host '{}' was not found " + stateEntity.getHostName());
@@ -689,18 +677,9 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
@Override
public State getState() {
- clusterGlobalLock.readLock().lock();
- try {
- readLock.lock();
- try {
- return stateMachine.getCurrentState();
- } finally {
- readLock.unlock();
- }
- } finally {
- clusterGlobalLock.readLock().unlock();
- }
-
+ // there's no reason to lock around the state machine for this SCH since
+ // the state machine is synchronized
+ return stateMachine.getCurrentState();
}
@Override
@@ -743,8 +722,8 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
} catch (InvalidStateTransitionException e) {
LOG.debug("Can't handle ServiceComponentHostEvent event at"
+ " current state"
- + ", serviceComponentName=" + this.getServiceComponentName()
- + ", hostName=" + this.getHostName()
+ + ", serviceComponentName=" + getServiceComponentName()
+ + ", hostName=" + getHostName()
+ ", currentState=" + oldState
+ ", eventType=" + event.getType()
+ ", event=" + event);
@@ -760,8 +739,8 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
if (!oldState.equals(getState())) {
if (LOG.isDebugEnabled()) {
LOG.debug("ServiceComponentHost transitioned to a new state"
- + ", serviceComponentName=" + this.getServiceComponentName()
- + ", hostName=" + this.getHostName()
+ + ", serviceComponentName=" + getServiceComponentName()
+ + ", hostName=" + getHostName()
+ ", oldState=" + oldState
+ ", currentState=" + getState()
+ ", eventType=" + event.getType().name()
@@ -772,48 +751,23 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
@Override
public String getServiceComponentName() {
- clusterGlobalLock.readLock().lock();
- try {
- readLock.lock();
- try {
- return serviceComponent.getName();
- } finally {
- readLock.unlock();
- }
- } finally {
- clusterGlobalLock.readLock().unlock();
- }
+ return serviceComponent.getName();
}
@Override
public String getHostName() {
- clusterGlobalLock.readLock().lock();
- try {
- readLock.lock();
- try {
- return host.getHostName();
- } finally {
- readLock.unlock();
- }
- } finally {
- clusterGlobalLock.readLock().unlock();
- }
+ return host.getHostName();
}
/**
* @return the lastOpStartTime
*/
public long getLastOpStartTime() {
- clusterGlobalLock.readLock().lock();
+ readLock.lock();
try {
- readLock.lock();
- try {
- return lastOpStartTime;
- } finally {
- readLock.unlock();
- }
+ return lastOpStartTime;
} finally {
- clusterGlobalLock.readLock().unlock();
+ readLock.unlock();
}
}
@@ -821,16 +775,11 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
* @param lastOpStartTime the lastOpStartTime to set
*/
public void setLastOpStartTime(long lastOpStartTime) {
- clusterGlobalLock.readLock().lock();
+ writeLock.lock();
try {
- writeLock.lock();
- try {
- this.lastOpStartTime = lastOpStartTime;
- } finally {
- writeLock.unlock();
- }
+ this.lastOpStartTime = lastOpStartTime;
} finally {
- clusterGlobalLock.readLock().unlock();
+ writeLock.unlock();
}
}
@@ -838,16 +787,11 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
* @return the lastOpEndTime
*/
public long getLastOpEndTime() {
- clusterGlobalLock.readLock().lock();
+ readLock.lock();
try {
- readLock.lock();
- try {
- return lastOpEndTime;
- } finally {
- readLock.unlock();
- }
+ return lastOpEndTime;
} finally {
- clusterGlobalLock.readLock().unlock();
+ readLock.unlock();
}
}
@@ -855,16 +799,11 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
* @param lastOpEndTime the lastOpEndTime to set
*/
public void setLastOpEndTime(long lastOpEndTime) {
- clusterGlobalLock.readLock().lock();
+ writeLock.lock();
try {
- writeLock.lock();
- try {
- this.lastOpEndTime = lastOpEndTime;
- } finally {
- writeLock.unlock();
- }
+ this.lastOpEndTime = lastOpEndTime;
} finally {
- clusterGlobalLock.readLock().unlock();
+ writeLock.unlock();
}
}
@@ -872,16 +811,11 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
* @return the lastOpLastUpdateTime
*/
public long getLastOpLastUpdateTime() {
- clusterGlobalLock.readLock().lock();
+ readLock.lock();
try {
- readLock.lock();
- try {
- return lastOpLastUpdateTime;
- } finally {
- readLock.unlock();
- }
+ return lastOpLastUpdateTime;
} finally {
- clusterGlobalLock.readLock().unlock();
+ readLock.unlock();
}
}
@@ -889,250 +823,157 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
* @param lastOpLastUpdateTime the lastOpLastUpdateTime to set
*/
public void setLastOpLastUpdateTime(long lastOpLastUpdateTime) {
- clusterGlobalLock.readLock().lock();
+ writeLock.lock();
try {
- writeLock.lock();
- try {
- this.lastOpLastUpdateTime = lastOpLastUpdateTime;
- } finally {
- writeLock.unlock();
- }
+ this.lastOpLastUpdateTime = lastOpLastUpdateTime;
} finally {
- clusterGlobalLock.readLock().unlock();
+ writeLock.unlock();
}
}
@Override
public long getClusterId() {
- clusterGlobalLock.readLock().lock();
- try {
- readLock.lock();
- try {
- return serviceComponent.getClusterId();
- } finally {
- readLock.unlock();
- }
- } finally {
- clusterGlobalLock.readLock().unlock();
- }
+ return serviceComponent.getClusterId();
}
@Override
public String getServiceName() {
- clusterGlobalLock.readLock().lock();
- try {
- readLock.lock();
- try {
- return serviceComponent.getServiceName();
- } finally {
- readLock.unlock();
- }
- } finally {
- clusterGlobalLock.readLock().unlock();
- }
+ return serviceComponent.getServiceName();
}
@Override
public StackId getStackVersion() {
- clusterGlobalLock.readLock().lock();
+ readLock.lock();
try {
- readLock.lock();
- try {
- return gson.fromJson(stateEntity.getCurrentStackVersion(), StackId.class);
- } finally {
- readLock.unlock();
- }
+ return gson.fromJson(stateEntity.getCurrentStackVersion(), StackId.class);
} finally {
- clusterGlobalLock.readLock().unlock();
+ readLock.unlock();
}
}
@Override
public void setStackVersion(StackId stackVersion) {
- clusterGlobalLock.readLock().lock();
+ writeLock.lock();
try {
- writeLock.lock();
- try {
- stateEntity.setCurrentStackVersion(gson.toJson(stackVersion));
- saveIfPersisted();
- } finally {
- writeLock.unlock();
- }
+ stateEntity.setCurrentStackVersion(gson.toJson(stackVersion));
+ saveIfPersisted();
} finally {
- clusterGlobalLock.readLock().unlock();
+ writeLock.unlock();
}
}
@Override
public State getDesiredState() {
- clusterGlobalLock.readLock().lock();
+ readLock.lock();
try {
- readLock.lock();
- try {
- return desiredStateEntity.getDesiredState();
- } finally {
- readLock.unlock();
- }
+ return desiredStateEntity.getDesiredState();
} finally {
- clusterGlobalLock.readLock().unlock();
+ readLock.unlock();
}
}
@Override
public void setDesiredState(State state) {
- clusterGlobalLock.readLock().lock();
+ writeLock.lock();
try {
- writeLock.lock();
- try {
- desiredStateEntity.setDesiredState(state);
- saveIfPersisted();
- } finally {
- writeLock.unlock();
- }
+ desiredStateEntity.setDesiredState(state);
+ saveIfPersisted();
} finally {
- clusterGlobalLock.readLock().unlock();
+ writeLock.unlock();
}
}
@Override
public StackId getDesiredStackVersion() {
- clusterGlobalLock.readLock().lock();
+ readLock.lock();
try {
- readLock.lock();
- try {
- return gson.fromJson(desiredStateEntity.getDesiredStackVersion(), StackId.class);
- } finally {
- readLock.unlock();
- }
+ return gson.fromJson(desiredStateEntity.getDesiredStackVersion(),
+ StackId.class);
} finally {
- clusterGlobalLock.readLock().unlock();
+ readLock.unlock();
}
}
@Override
public void setDesiredStackVersion(StackId stackVersion) {
- clusterGlobalLock.readLock().lock();
+ writeLock.lock();
try {
- writeLock.lock();
- try {
- desiredStateEntity.setDesiredStackVersion(gson.toJson(stackVersion));
- saveIfPersisted();
- } finally {
- writeLock.unlock();
- }
+ desiredStateEntity.setDesiredStackVersion(gson.toJson(stackVersion));
+ saveIfPersisted();
} finally {
- clusterGlobalLock.readLock().unlock();
+ writeLock.unlock();
}
}
@Override
public HostComponentAdminState getComponentAdminState() {
- clusterGlobalLock.readLock().lock();
+ readLock.lock();
try {
- readLock.lock();
- try {
- HostComponentAdminState adminState = desiredStateEntity.getAdminState();
- if (adminState == null
- && !serviceComponent.isClientComponent() && !serviceComponent.isMasterComponent()) {
- adminState = HostComponentAdminState.INSERVICE;
- }
- return adminState;
- } finally {
- readLock.unlock();
+ HostComponentAdminState adminState = desiredStateEntity.getAdminState();
+ if (adminState == null && !serviceComponent.isClientComponent()
+ && !serviceComponent.isMasterComponent()) {
+ adminState = HostComponentAdminState.INSERVICE;
}
+ return adminState;
} finally {
- clusterGlobalLock.readLock().unlock();
+ readLock.unlock();
}
}
@Override
public void setComponentAdminState(HostComponentAdminState attribute) {
- clusterGlobalLock.readLock().lock();
+ writeLock.lock();
try {
- writeLock.lock();
- try {
- desiredStateEntity.setAdminState(attribute);
- saveIfPersisted();
- } finally {
- writeLock.unlock();
- }
+ desiredStateEntity.setAdminState(attribute);
+ saveIfPersisted();
} finally {
- clusterGlobalLock.readLock().unlock();
+ writeLock.unlock();
}
}
@Override
public ServiceComponentHostResponse convertToResponse() {
- clusterGlobalLock.readLock().lock();
+ readLock.lock();
try {
- readLock.lock();
- try {
- ServiceComponentHostResponse r = new ServiceComponentHostResponse(
- serviceComponent.getClusterName(),
- serviceComponent.getServiceName(),
- serviceComponent.getName(),
- getHostName(),
- getState().toString(),
- getStackVersion().getStackId(),
- getDesiredState().toString(),
- getDesiredStackVersion().getStackId(),
- getComponentAdminState());
-
- r.setActualConfigs(actualConfigs);
+ ServiceComponentHostResponse r = new ServiceComponentHostResponse(
+ serviceComponent.getClusterName(), serviceComponent.getServiceName(),
+ serviceComponent.getName(), getHostName(), getState().toString(),
+ getStackVersion().getStackId(), getDesiredState().toString(),
+ getDesiredStackVersion().getStackId(), getComponentAdminState());
- try {
- r.setStaleConfig(helper.isStaleConfigs(this));
- } catch (Exception e) {
- LOG.error("Could not determine stale config", e);
- }
+ r.setActualConfigs(actualConfigs);
- return r;
- } finally {
- readLock.unlock();
+ try {
+ r.setStaleConfig(helper.isStaleConfigs(this));
+ } catch (Exception e) {
+ LOG.error("Could not determine stale config", e);
}
+
+ return r;
} finally {
- clusterGlobalLock.readLock().unlock();
+ readLock.unlock();
}
}
@Override
public String getClusterName() {
- clusterGlobalLock.readLock().lock();
- try {
- readLock.lock();
- try {
- return serviceComponent.getClusterName();
- } finally {
- readLock.unlock();
- }
- } finally {
- clusterGlobalLock.readLock().unlock();
- }
-
+ return serviceComponent.getClusterName();
}
@Override
public void debugDump(StringBuilder sb) {
- clusterGlobalLock.readLock().lock();
+ readLock.lock();
try {
- readLock.lock();
- try {
- sb.append("ServiceComponentHost={ hostname=").append(getHostName())
- .append(", serviceComponentName=").append(serviceComponent.getName())
- .append(", clusterName=").append(serviceComponent.getClusterName())
- .append(", serviceName=").append(serviceComponent.getServiceName())
- .append(", desiredStackVersion=").append(getDesiredStackVersion())
- .append(", desiredState=").append(getDesiredState())
- .append(", stackVersion=").append(getStackVersion())
- .append(", state=").append(getState())
- .append(" }");
- } finally {
- readLock.unlock();
- }
+ sb.append("ServiceComponentHost={ hostname=").append(getHostName()).append(
+ ", serviceComponentName=").append(serviceComponent.getName()).append(
+ ", clusterName=").append(serviceComponent.getClusterName()).append(
+ ", serviceName=").append(serviceComponent.getServiceName()).append(
+ ", desiredStackVersion=").append(getDesiredStackVersion()).append(
+ ", desiredState=").append(getDesiredState()).append(", stackVersion=").append(
+ getStackVersion()).append(", state=").append(getState()).append(" }");
} finally {
- clusterGlobalLock.readLock().unlock();
+ readLock.unlock();
}
-
}
@Override
@@ -1153,12 +994,23 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
@Override
public void persist() {
- clusterGlobalLock.readLock().lock();
+ boolean clusterWriteLockAcquired = false;
+ if (!persisted) {
+ clusterGlobalLock.writeLock().lock();
+ clusterWriteLockAcquired = true;
+ }
+
try {
writeLock.lock();
try {
if (!persisted) {
+ // persist the new cluster topology and then release the cluster lock
+ // as it has no more bearing on the rest of this persist() method
persistEntities();
+ clusterGlobalLock.writeLock().unlock();
+ clusterWriteLockAcquired = false;
+
+ // these shoudl still be done with the internal lock
refresh();
host.refresh();
serviceComponent.refresh();
@@ -1170,9 +1022,10 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
writeLock.unlock();
}
} finally {
- clusterGlobalLock.readLock().unlock();
+ if (clusterWriteLockAcquired) {
+ clusterGlobalLock.writeLock().unlock();
+ }
}
-
}
@Transactional
@@ -1204,33 +1057,27 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
@Override
@Transactional
public void refresh() {
- clusterGlobalLock.readLock().lock();
+ writeLock.lock();
try {
- writeLock.lock();
- try {
- if (isPersisted()) {
- HostComponentStateEntityPK pk = new HostComponentStateEntityPK();
- HostComponentDesiredStateEntityPK dpk = new HostComponentDesiredStateEntityPK();
- pk.setClusterId(getClusterId());
- pk.setComponentName(getServiceComponentName());
- pk.setServiceName(getServiceName());
- pk.setHostName(getHostName());
- dpk.setClusterId(getClusterId());
- dpk.setComponentName(getServiceComponentName());
- dpk.setServiceName(getServiceName());
- dpk.setHostName(getHostName());
- stateEntity = hostComponentStateDAO.findByPK(pk);
- desiredStateEntity = hostComponentDesiredStateDAO.findByPK(dpk);
- hostComponentStateDAO.refresh(stateEntity);
- hostComponentDesiredStateDAO.refresh(desiredStateEntity);
- }
- } finally {
- writeLock.unlock();
+ if (isPersisted()) {
+ HostComponentStateEntityPK pk = new HostComponentStateEntityPK();
+ HostComponentDesiredStateEntityPK dpk = new HostComponentDesiredStateEntityPK();
+ pk.setClusterId(getClusterId());
+ pk.setComponentName(getServiceComponentName());
+ pk.setServiceName(getServiceName());
+ pk.setHostName(getHostName());
+ dpk.setClusterId(getClusterId());
+ dpk.setComponentName(getServiceComponentName());
+ dpk.setServiceName(getServiceName());
+ dpk.setHostName(getHostName());
+ stateEntity = hostComponentStateDAO.findByPK(pk);
+ desiredStateEntity = hostComponentDesiredStateDAO.findByPK(dpk);
+ hostComponentStateDAO.refresh(stateEntity);
+ hostComponentDesiredStateDAO.refresh(desiredStateEntity);
}
} finally {
- clusterGlobalLock.readLock().unlock();
+ writeLock.unlock();
}
-
}
@Transactional
@@ -1244,19 +1091,21 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
@Override
public boolean canBeRemoved() {
clusterGlobalLock.readLock().lock();
+ boolean schLockAcquired = false;
try {
- readLock.lock();
- try {
-
- return (getState().isRemovableState());
+ // if unable to read, then writers are writing; cannot remove SCH
+ schLockAcquired = readLock.tryLock();
+ if (!schLockAcquired) {
+ return false;
+ }
- } finally {
+ return (getState().isRemovableState());
+ } finally {
+ if (schLockAcquired) {
readLock.unlock();
}
- } finally {
clusterGlobalLock.readLock().unlock();
}
-
}
@Override
@@ -1269,7 +1118,7 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
removeEntities();
persisted = false;
}
- clusters.getCluster(this.getClusterName()).removeServiceComponentHost(this);
+ clusters.getCluster(getClusterName()).removeServiceComponentHost(this);
} catch (AmbariException ex) {
if (LOG.isDebugEnabled()) {
LOG.error(ex.getMessage());
@@ -1301,7 +1150,7 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
hostComponentDesiredStateDAO.removeByPK(desiredPK);
}
-
+
@Override
public void updateActualConfigs(Map<String, Map<String, String>> configTags) {
Map<Long, ConfigGroup> configGroupMap;
@@ -1314,165 +1163,120 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
return;
}
- clusterGlobalLock.readLock().lock();
+ writeLock.lock();
try {
- writeLock.lock();
- try {
- LOG.debug("Updating actual config tags: " + configTags);
- actualConfigs = new HashMap<String, HostConfig>();
-
- for (Entry<String, Map<String, String>> entry : configTags.entrySet()) {
- String type = entry.getKey();
- Map<String, String> values = new HashMap<String, String>(entry.getValue());
-
- String tag = values.get(ConfigHelper.CLUSTER_DEFAULT_TAG);
- values.remove(ConfigHelper.CLUSTER_DEFAULT_TAG);
-
- HostConfig hc = new HostConfig();
- hc.setDefaultVersionTag(tag);
- actualConfigs.put(type, hc);
-
- if (!values.isEmpty()) {
- for (Entry<String, String> overrideEntry : values.entrySet()) {
- Long groupId = Long.parseLong(overrideEntry.getKey());
- hc.getConfigGroupOverrides().put(groupId, overrideEntry.getValue());
- if (!configGroupMap.containsKey(groupId)) {
- LOG.debug("Config group does not exist, id = " + groupId);
- }
+ LOG.debug("Updating actual config tags: " + configTags);
+ actualConfigs = new HashMap<String, HostConfig>();
+
+ for (Entry<String, Map<String, String>> entry : configTags.entrySet()) {
+ String type = entry.getKey();
+ Map<String, String> values = new HashMap<String, String>(
+ entry.getValue());
+
+ String tag = values.get(ConfigHelper.CLUSTER_DEFAULT_TAG);
+ values.remove(ConfigHelper.CLUSTER_DEFAULT_TAG);
+
+ HostConfig hc = new HostConfig();
+ hc.setDefaultVersionTag(tag);
+ actualConfigs.put(type, hc);
+
+ if (!values.isEmpty()) {
+ for (Entry<String, String> overrideEntry : values.entrySet()) {
+ Long groupId = Long.parseLong(overrideEntry.getKey());
+ hc.getConfigGroupOverrides().put(groupId, overrideEntry.getValue());
+ if (!configGroupMap.containsKey(groupId)) {
+ LOG.debug("Config group does not exist, id = " + groupId);
}
}
}
- } finally {
- writeLock.unlock();
}
} finally {
- clusterGlobalLock.readLock().unlock();
+ writeLock.unlock();
}
}
-
-
-
+
+
+
@Override
public Map<String, HostConfig> getActualConfigs() {
- clusterGlobalLock.readLock().lock();
+ readLock.lock();
try {
- readLock.lock();
- try {
- return actualConfigs;
- } finally {
- readLock.unlock();
- }
+ return actualConfigs;
} finally {
- clusterGlobalLock.readLock().unlock();
+ readLock.unlock();
}
-
}
@Override
public HostState getHostState() {
- clusterGlobalLock.readLock().lock();
+ readLock.lock();
try {
- readLock.lock();
- try {
- return host.getState();
- } finally {
- readLock.unlock();
- }
+ return host.getState();
} finally {
- clusterGlobalLock.readLock().unlock();
+ readLock.unlock();
}
}
-
+
@Override
public void setMaintenanceState(MaintenanceState state) {
- clusterGlobalLock.readLock().lock();
+ writeLock.lock();
try {
- writeLock.lock();
- try {
- desiredStateEntity.setMaintenanceState(state);
- saveIfPersisted();
- } finally {
- writeLock.unlock();
- }
+ desiredStateEntity.setMaintenanceState(state);
+ saveIfPersisted();
} finally {
- clusterGlobalLock.readLock().unlock();
+ writeLock.unlock();
}
}
@Override
public MaintenanceState getMaintenanceState() {
- clusterGlobalLock.readLock().lock();
+ readLock.lock();
try {
- readLock.lock();
- try {
- return desiredStateEntity.getMaintenanceState();
- } finally {
- readLock.unlock();
- }
+ return desiredStateEntity.getMaintenanceState();
} finally {
- clusterGlobalLock.readLock().unlock();
+ readLock.unlock();
}
}
-
+
@Override
public void setProcesses(List<Map<String, String>> procs) {
- clusterGlobalLock.readLock().lock();
+ writeLock.lock();
try {
- writeLock.lock();
- try {
- processes = Collections.unmodifiableList(procs);
- } finally {
- writeLock.unlock();
- }
+ processes = Collections.unmodifiableList(procs);
} finally {
- clusterGlobalLock.readLock().unlock();
+ writeLock.unlock();
}
}
-
- @Override
+
+ @Override
public List<Map<String, String>> getProcesses() {
- clusterGlobalLock.readLock().lock();
+ readLock.lock();
try {
- readLock.lock();
- try {
- return processes;
- } finally {
- readLock.unlock();
- }
+ return processes;
} finally {
- clusterGlobalLock.readLock().unlock();
+ readLock.unlock();
}
}
@Override
public boolean isRestartRequired() {
- clusterGlobalLock.readLock().lock();
+ readLock.lock();
try {
- readLock.lock();
- try {
- return desiredStateEntity.isRestartRequired();
- } finally {
- readLock.unlock();
- }
+ return desiredStateEntity.isRestartRequired();
} finally {
- clusterGlobalLock.readLock().unlock();
+ readLock.unlock();
}
}
@Override
public void setRestartRequired(boolean restartRequired) {
- clusterGlobalLock.readLock().lock();
+ writeLock.lock();
try {
- writeLock.lock();
- try {
- desiredStateEntity.setRestartRequired(restartRequired);
- saveIfPersisted();
- helper.invalidateStaleConfigsCache(this);
- } finally {
- writeLock.unlock();
- }
+ desiredStateEntity.setRestartRequired(restartRequired);
+ saveIfPersisted();
+ helper.invalidateStaleConfigsCache(this);
} finally {
- clusterGlobalLock.readLock().unlock();
+ writeLock.unlock();
}
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dd572d35/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java
new file mode 100644
index 0000000..d1b5c3a
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.state.cluster;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.ServiceComponentNotFoundException;
+import org.apache.ambari.server.ServiceNotFoundException;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.orm.GuiceJpaInitializer;
+import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponent;
+import org.apache.ambari.server.state.ServiceComponentFactory;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.ServiceComponentHostFactory;
+import org.apache.ambari.server.state.ServiceFactory;
+import org.apache.ambari.server.state.StackId;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.persist.PersistService;
+
+/**
+ * Tests AMBARI-9368 which produced a deadlock during read and writes of some of
+ * the impl classes.
+ */
+public class ClusterDeadlockTest {
+ private final AtomicInteger hostNameCounter = new AtomicInteger(0);
+
+ @Inject
+ private Injector injector;
+
+ @Inject
+ private Clusters clusters;
+
+ @Inject
+ private ServiceFactory serviceFactory;
+
+ @Inject
+ private ServiceComponentFactory serviceComponentFactory;
+
+ @Inject
+ private ServiceComponentHostFactory serviceComponentHostFactory;
+
+ @Inject
+ private AmbariMetaInfo metaInfo;
+
+
+ @Before
+ public void setup() throws Exception {
+ injector = Guice.createInjector(new InMemoryDefaultTestModule());
+ injector.getInstance(GuiceJpaInitializer.class);
+ injector.injectMembers(this);
+ clusters.addCluster("c1");
+
+ StackId stackId = new StackId("HDP-0.1");
+ Cluster c1 = clusters.getCluster("c1");
+ c1.setDesiredStackVersion(stackId);
+ metaInfo.init();
+
+ // 100 hosts
+ for (int i = 0; i < 100; i++) {
+ String hostName = "c64-" + i;
+ clusters.addHost(hostName);
+ setOsFamily(clusters.getHost(hostName), "redhat", "6.4");
+ clusters.getHost(hostName).persist();
+ clusters.mapHostToCluster(hostName, "c1");
+ }
+
+ // force creation of the service and the components on the last host
+ createNewServiceComponentHost("HDFS", "NAMENODE", "c64-99", false);
+ createNewServiceComponentHost("HDFS", "HDFS_CLIENT", "c64-99", true);
+ }
+
+ @After
+ public void teardown() {
+ injector.getInstance(PersistService.class).stop();
+ }
+
+ /**
+ * Tests that concurrent impl serialization and impl writing doesn't cause a
+ * deadlock.
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 30000)
+ public void testDeadlockBetweenImplementations() throws Exception {
+ Cluster cluster = clusters.getCluster("c1");
+ Service service = cluster.getService("HDFS");
+ ServiceComponent namenodeComponent = service.getServiceComponent("NAMENODE");
+ ServiceComponent hdfsClientComponent = service.getServiceComponent("HDFS_CLIENT");
+
+ ServiceComponentHost namenodeSCH = createNewServiceComponentHost("HDFS",
+ "NAMENODE", "c64-0", false);
+
+ ServiceComponentHost hdfsClientSCH = createNewServiceComponentHost("HDFS",
+ "HDFS_CLIENT", "c64-0", true);
+
+ List<Thread> threads = new ArrayList<Thread>();
+ for (int i = 0; i < 3; i++) {
+ DeadlockExerciserThread thread = new DeadlockExerciserThread();
+ thread.setCluster(cluster);
+ thread.setService(service);
+ thread.setHdfsClientComponent(hdfsClientComponent);
+ thread.setNamenodeComponent(namenodeComponent);
+ thread.setNamenodeSCH(namenodeSCH);
+ thread.setHdfsClientSCH(hdfsClientSCH);
+ thread.start();
+ threads.add(thread);
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ }
+
+ /**
+ * Tests that while serializing a service component, writes to that service
+ * component do not cause a deadlock with the global cluster lock.
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 30000)
+ public void testAddingHostComponentsWhileReading() throws Exception {
+ Cluster cluster = clusters.getCluster("c1");
+ Service service = cluster.getService("HDFS");
+ ServiceComponent namenodeComponent = service.getServiceComponent("NAMENODE");
+ ServiceComponent hdfsClientComponent = service.getServiceComponent("HDFS_CLIENT");
+
+ List<Thread> threads = new ArrayList<Thread>();
+ for (int i = 0; i < 5; i++) {
+ ServiceComponentDeadlockThread thread = new ServiceComponentDeadlockThread();
+ thread.setHdfsClientComponent(hdfsClientComponent);
+ thread.setNamenodeComponent(namenodeComponent);
+ thread.start();
+ threads.add(thread);
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ }
+
+ /**
+ * Tests AMBARI-9368 which saw a deadlock when adding a service component host
+ * while reading a service component.
+ */
+ private final class ServiceComponentDeadlockThread extends Thread {
+ private ServiceComponent namenodeComponent;
+ private ServiceComponent hdfsClientComponent;
+
+ /**
+ * @param namenodeComponent
+ * the namenodeComponent to set
+ */
+ public void setNamenodeComponent(ServiceComponent namenodeComponent) {
+ this.namenodeComponent = namenodeComponent;
+ }
+
+ /**
+ * @param hdfsClientComponent
+ * the hdfsClientComponent to set
+ */
+ public void setHdfsClientComponent(ServiceComponent hdfsClientComponent) {
+ this.hdfsClientComponent = hdfsClientComponent;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < 15; i++) {
+ int hostNumeric = hostNameCounter.getAndIncrement();
+
+ namenodeComponent.convertToResponse();
+ createNewServiceComponentHost("HDFS", "NAMENODE", "c64-"
+ + hostNumeric, false);
+
+ hdfsClientComponent.convertToResponse();
+ createNewServiceComponentHost("HDFS", "HDFS_CLIENT", "c64-"
+ + hostNumeric, true);
+
+ Thread.sleep(10);
+ }
+ } catch (Exception exception) {
+ throw new RuntimeException(exception);
+ }
+ }
+ }
+
+ /**
+ * Tests AMBARI-9368 which produced a deadlock during read and writes of some
+ * of the impl classes.
+ */
+ private static final class DeadlockExerciserThread extends Thread {
+ private Cluster cluster;
+ private Service service;
+ private ServiceComponent namenodeComponent;
+ private ServiceComponent hdfsClientComponent;
+ private ServiceComponentHost namenodeSCH;
+ private ServiceComponentHost hdfsClientSCH;
+
+ /**
+ * @param cluster
+ * the cluster to set
+ */
+ public void setCluster(Cluster cluster) {
+ this.cluster = cluster;
+ }
+
+ /**
+ * @param service
+ * the service to set
+ */
+ public void setService(Service service) {
+ this.service = service;
+ }
+
+ /**
+ * @param namenodeComponent
+ * the namenodeComponent to set
+ */
+ public void setNamenodeComponent(ServiceComponent namenodeComponent) {
+ this.namenodeComponent = namenodeComponent;
+ }
+
+ /**
+ * @param hdfsClientComponent
+ * the hdfsClientComponent to set
+ */
+ public void setHdfsClientComponent(ServiceComponent hdfsClientComponent) {
+ this.hdfsClientComponent = hdfsClientComponent;
+ }
+
+ /**
+ * @param namenodeSCH
+ * the namenodeSCH to set
+ */
+ public void setNamenodeSCH(ServiceComponentHost namenodeSCH) {
+ this.namenodeSCH = namenodeSCH;
+ }
+
+ /**
+ * @param hdfsClientSCH
+ * the hdfsClientSCH to set
+ */
+ public void setHdfsClientSCH(ServiceComponentHost hdfsClientSCH) {
+ this.hdfsClientSCH = hdfsClientSCH;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < 10; i++) {
+ cluster.convertToResponse();
+ service.convertToResponse();
+ namenodeComponent.convertToResponse();
+ hdfsClientComponent.convertToResponse();
+ namenodeSCH.convertToResponse();
+ hdfsClientSCH.convertToResponse();
+
+ cluster.setProvisioningState(org.apache.ambari.server.state.State.INIT);
+ service.setMaintenanceState(MaintenanceState.OFF);
+ namenodeComponent.setDesiredState(org.apache.ambari.server.state.State.STARTED);
+ hdfsClientComponent.setDesiredState(org.apache.ambari.server.state.State.INSTALLED);
+
+ namenodeSCH.setState(org.apache.ambari.server.state.State.STARTED);
+ hdfsClientSCH.setState(org.apache.ambari.server.state.State.INSTALLED);
+
+ Thread.sleep(100);
+ }
+ } catch (Exception exception) {
+ throw new RuntimeException(exception);
+ }
+ }
+ }
+
+ private void setOsFamily(Host host, String osFamily, String osVersion) {
+ Map<String, String> hostAttributes = new HashMap<String, String>(2);
+ hostAttributes.put("os_family", osFamily);
+ hostAttributes.put("os_release_version", osVersion);
+ host.setHostAttributes(hostAttributes);
+ }
+
+ private ServiceComponentHost createNewServiceComponentHost(String svc,
+ String svcComponent, String hostName, boolean isClient)
+ throws AmbariException {
+ Cluster c = clusters.getCluster("c1");
+ Assert.assertNotNull(c.getConfigGroups());
+ return createNewServiceComponentHost(c, svc, svcComponent, hostName);
+ }
+
+ private ServiceComponentHost createNewServiceComponentHost(Cluster c,
+ String svc, String svcComponent, String hostName) throws AmbariException {
+
+ Service s = null;
+
+ try {
+ s = c.getService(svc);
+ } catch (ServiceNotFoundException e) {
+ s = serviceFactory.createNew(c, svc);
+ c.addService(s);
+ s.persist();
+ }
+
+ ServiceComponent sc = null;
+ try {
+ sc = s.getServiceComponent(svcComponent);
+ } catch (ServiceComponentNotFoundException e) {
+ sc = serviceComponentFactory.createNew(s, svcComponent);
+ s.addServiceComponent(sc);
+ sc.persist();
+ }
+
+ ServiceComponentHost impl = serviceComponentHostFactory.createNew(sc,
+ hostName);
+
+ impl.persist();
+ return impl;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dd572d35/ambari-web/package.json
----------------------------------------------------------------------
diff --git a/ambari-web/package.json b/ambari-web/package.json
index be5a225..4468d36 100644
--- a/ambari-web/package.json
+++ b/ambari-web/package.json
@@ -19,7 +19,7 @@
"devDependencies": {
"phantomjs": "^1.9.2",
"mocha":"1.9.0",
- "mocha-phantomjs": "^3.1.6",
+ "mocha-phantomjs": "~3.1.6",
"chai":"~1.9.0",
"sinon":"=1.7.3",
"sinon-chai":"~2.5.0",
[2/3] ambari git commit: AMBARI-9368 - Deadlock Between Dependent
Cluster/Service/Component/Host Implementations (jonathanhurley)
Posted by jo...@apache.org.
http://git-wip-us.apache.org/repos/asf/ambari/blob/dd572d35/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index 9bde472..1a595b1 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -30,15 +30,13 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.persistence.RollbackException;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.ConfigGroupNotFoundException;
import org.apache.ambari.server.ObjectNotFoundException;
import org.apache.ambari.server.ParentObjectNotFoundException;
import org.apache.ambari.server.ServiceComponentHostNotFoundException;
@@ -47,6 +45,7 @@ import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.controller.ClusterResponse;
import org.apache.ambari.server.controller.ConfigurationResponse;
import org.apache.ambari.server.controller.MaintenanceStateHelper;
+import org.apache.ambari.server.controller.ServiceConfigVersionResponse;
import org.apache.ambari.server.orm.RequiresSession;
import org.apache.ambari.server.orm.cache.ConfigGroupHostMapping;
import org.apache.ambari.server.orm.cache.HostConfigMapping;
@@ -66,26 +65,46 @@ import org.apache.ambari.server.orm.entities.PrivilegeEntity;
import org.apache.ambari.server.orm.entities.RequestScheduleEntity;
import org.apache.ambari.server.orm.entities.ResourceEntity;
import org.apache.ambari.server.orm.entities.ServiceConfigEntity;
-import org.apache.ambari.server.state.*;
+import org.apache.ambari.server.state.Alert;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.ClusterHealthReport;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.ConfigFactory;
+import org.apache.ambari.server.state.ConfigHelper;
+import org.apache.ambari.server.state.ConfigVersionHelper;
+import org.apache.ambari.server.state.DesiredConfig;
+import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.state.HostHealthStatus;
+import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.ambari.server.state.PropertyInfo;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponent;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.ServiceComponentHostEvent;
+import org.apache.ambari.server.state.ServiceFactory;
+import org.apache.ambari.server.state.ServiceInfo;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.State;
import org.apache.ambari.server.state.configgroup.ConfigGroup;
import org.apache.ambari.server.state.configgroup.ConfigGroupFactory;
import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
import org.apache.ambari.server.state.scheduler.RequestExecution;
import org.apache.ambari.server.state.scheduler.RequestExecutionFactory;
-import org.apache.ambari.server.controller.ServiceConfigVersionResponse;
-import org.apache.commons.lang.StringUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.Predicate;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.HashMultimap;
import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Multimap;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.persist.Transactional;
-import org.apache.ambari.server.ConfigGroupNotFoundException;
public class ClusterImpl implements Cluster {
@@ -128,14 +147,10 @@ public class ClusterImpl implements Cluster {
*/
private Map<Long, RequestExecution> requestExecutions;
- private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- private Lock readLock = readWriteLock.readLock();
- private Lock writeLock = readWriteLock.writeLock();
-
private final ReadWriteLock clusterGlobalLock = new ReentrantReadWriteLock();
private ClusterEntity clusterEntity;
-
+
private Set<Alert> clusterAlerts = new HashSet<Alert>();
private final ConfigVersionHelper configVersionHelper;
@@ -177,11 +192,11 @@ public class ClusterImpl implements Cluster {
injector.injectMembers(this);
this.clusterEntity = clusterEntity;
- this.serviceComponentHosts = new HashMap<String,
+ serviceComponentHosts = new HashMap<String,
Map<String, Map<String, ServiceComponentHost>>>();
- this.serviceComponentHostsByHost = new HashMap<String,
+ serviceComponentHostsByHost = new HashMap<String,
List<ServiceComponentHost>>();
- this.desiredStackVersion = gson.fromJson(
+ desiredStackVersion = gson.fromJson(
clusterEntity.getDesiredStackVersion(), StackId.class);
allConfigs = new HashMap<String, Map<String, Config>>();
if (!clusterEntity.getClusterConfigEntities().isEmpty()) {
@@ -259,94 +274,85 @@ public class ClusterImpl implements Cluster {
*/
public void loadServiceHostComponents() {
loadServices();
- if (svcHostsLoaded) return;
+ if (svcHostsLoaded) {
+ return;
+ }
clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
- try {
- LOG.info("Loading Service Host Components");
- if (svcHostsLoaded) return;
- if (services != null) {
- for (Entry<String, Service> serviceKV : services.entrySet()) {
+ LOG.info("Loading Service Host Components");
+ if (svcHostsLoaded) {
+ return;
+ }
+ if (services != null) {
+ for (Entry<String, Service> serviceKV : services.entrySet()) {
/* get all the service component hosts **/
- Service service = serviceKV.getValue();
- if (!serviceComponentHosts.containsKey(service.getName())) {
- serviceComponentHosts.put(service.getName(), new HashMap<String,
- Map<String, ServiceComponentHost>>());
- }
- for (Entry<String, ServiceComponent> svcComponent :
- service.getServiceComponents().entrySet()) {
- ServiceComponent comp = svcComponent.getValue();
- String componentName = svcComponent.getKey();
- if (!serviceComponentHosts.get(service.getName()).containsKey(componentName)) {
- serviceComponentHosts.get(service.getName()).put(componentName,
+ Service service = serviceKV.getValue();
+ if (!serviceComponentHosts.containsKey(service.getName())) {
+ serviceComponentHosts.put(service.getName(),
+ new HashMap<String, Map<String, ServiceComponentHost>>());
+ }
+ for (Entry<String, ServiceComponent> svcComponent : service.getServiceComponents().entrySet()) {
+ ServiceComponent comp = svcComponent.getValue();
+ String componentName = svcComponent.getKey();
+ if (!serviceComponentHosts.get(service.getName()).containsKey(
+ componentName)) {
+ serviceComponentHosts.get(service.getName()).put(componentName,
new HashMap<String, ServiceComponentHost>());
- }
- /** Get Service Host Components **/
- for (Entry<String, ServiceComponentHost> svchost :
- comp.getServiceComponentHosts().entrySet()) {
- String hostname = svchost.getKey();
- ServiceComponentHost svcHostComponent = svchost.getValue();
- if (!serviceComponentHostsByHost.containsKey(hostname)) {
- serviceComponentHostsByHost.put(hostname,
+ }
+ /** Get Service Host Components **/
+ for (Entry<String, ServiceComponentHost> svchost : comp.getServiceComponentHosts().entrySet()) {
+ String hostname = svchost.getKey();
+ ServiceComponentHost svcHostComponent = svchost.getValue();
+ if (!serviceComponentHostsByHost.containsKey(hostname)) {
+ serviceComponentHostsByHost.put(hostname,
new ArrayList<ServiceComponentHost>());
- }
- List<ServiceComponentHost> compList = serviceComponentHostsByHost.get(hostname);
- compList.add(svcHostComponent);
+ }
+ List<ServiceComponentHost> compList = serviceComponentHostsByHost.get(hostname);
+ compList.add(svcHostComponent);
- if (!serviceComponentHosts.get(service.getName()).get(componentName)
- .containsKey(hostname)) {
- serviceComponentHosts.get(service.getName()).get(componentName)
- .put(hostname, svcHostComponent);
- }
+ if (!serviceComponentHosts.get(service.getName()).get(
+ componentName).containsKey(hostname)) {
+ serviceComponentHosts.get(service.getName()).get(componentName).put(
+ hostname, svcHostComponent);
}
}
}
}
- svcHostsLoaded = true;
- } finally {
- writeLock.unlock();
}
+ svcHostsLoaded = true;
} finally {
clusterGlobalLock.writeLock().unlock();
}
-
}
private void loadServices() {
- //logging here takes too much time
-// LOG.info("clusterEntity " + clusterEntity.getClusterServiceEntities() );
if (services == null) {
clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
- try {
- if (services == null) {
- services = new TreeMap<String, Service>();
- if (!clusterEntity.getClusterServiceEntities().isEmpty()) {
- for (ClusterServiceEntity serviceEntity : clusterEntity.getClusterServiceEntities()) {
- StackId stackId = getCurrentStackVersion();
- try {
- if (ambariMetaInfo.getServiceInfo(stackId.getStackName(), stackId.getStackVersion(),
- serviceEntity.getServiceName()) != null) {
- services.put(serviceEntity.getServiceName(), serviceFactory.createExisting(this, serviceEntity));
- }
- } catch (AmbariException e) {
- LOG.error(String.format("Can not get service info: stackName=%s, stackVersion=%s, serviceName=%s",
- stackId.getStackName(), stackId.getStackVersion(),
- serviceEntity.getServiceName()));
- e.printStackTrace();
+ if (services == null) {
+ services = new TreeMap<String, Service>();
+ if (!clusterEntity.getClusterServiceEntities().isEmpty()) {
+ for (ClusterServiceEntity serviceEntity : clusterEntity.getClusterServiceEntities()) {
+ StackId stackId = getCurrentStackVersion();
+ try {
+ if (ambariMetaInfo.getServiceInfo(stackId.getStackName(),
+ stackId.getStackVersion(), serviceEntity.getServiceName()) != null) {
+ services.put(serviceEntity.getServiceName(),
+ serviceFactory.createExisting(this, serviceEntity));
}
+ } catch (AmbariException e) {
+ LOG.error(String.format(
+ "Can not get service info: stackName=%s, stackVersion=%s, serviceName=%s",
+ stackId.getStackName(), stackId.getStackVersion(),
+ serviceEntity.getServiceName()));
+ e.printStackTrace();
}
}
}
- } finally {
- writeLock.unlock();
}
} finally {
clusterGlobalLock.writeLock().unlock();
}
-
}
}
@@ -354,20 +360,14 @@ public class ClusterImpl implements Cluster {
if (clusterConfigGroups == null) {
clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
- try {
- if (clusterConfigGroups == null) {
- clusterConfigGroups = new HashMap<Long, ConfigGroup>();
- if (!clusterEntity.getConfigGroupEntities().isEmpty()) {
- for (ConfigGroupEntity configGroupEntity :
- clusterEntity.getConfigGroupEntities()) {
- clusterConfigGroups.put(configGroupEntity.getGroupId(),
+ if (clusterConfigGroups == null) {
+ clusterConfigGroups = new HashMap<Long, ConfigGroup>();
+ if (!clusterEntity.getConfigGroupEntities().isEmpty()) {
+ for (ConfigGroupEntity configGroupEntity : clusterEntity.getConfigGroupEntities()) {
+ clusterConfigGroups.put(configGroupEntity.getGroupId(),
configGroupFactory.createExisting(this, configGroupEntity));
- }
}
}
- } finally {
- writeLock.unlock();
}
} finally {
clusterGlobalLock.writeLock().unlock();
@@ -379,20 +379,14 @@ public class ClusterImpl implements Cluster {
if (requestExecutions == null) {
clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
- try {
- if (requestExecutions == null) {
- requestExecutions = new HashMap<Long, RequestExecution>();
- if (!clusterEntity.getRequestScheduleEntities().isEmpty()) {
- for (RequestScheduleEntity scheduleEntity : clusterEntity
- .getRequestScheduleEntities()) {
- requestExecutions.put(scheduleEntity.getScheduleId(),
+ if (requestExecutions == null) {
+ requestExecutions = new HashMap<Long, RequestExecution>();
+ if (!clusterEntity.getRequestScheduleEntities().isEmpty()) {
+ for (RequestScheduleEntity scheduleEntity : clusterEntity.getRequestScheduleEntities()) {
+ requestExecutions.put(scheduleEntity.getScheduleId(),
requestExecutionFactory.createExisting(this, scheduleEntity));
- }
}
}
- } finally {
- writeLock.unlock();
}
} finally {
clusterGlobalLock.writeLock().unlock();
@@ -405,27 +399,20 @@ public class ClusterImpl implements Cluster {
loadConfigGroups();
clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
- try {
- LOG.debug("Adding a new Config group"
- + ", clusterName = " + getClusterName()
- + ", groupName = " + configGroup.getName()
+ LOG.debug("Adding a new Config group" + ", clusterName = "
+ + getClusterName() + ", groupName = " + configGroup.getName()
+ ", tag = " + configGroup.getTag());
- if (clusterConfigGroups.containsKey(configGroup.getId())) {
- // The loadConfigGroups will load all groups to memory
- LOG.debug("Config group already exists"
- + ", clusterName = " + getClusterName()
- + ", groupName = " + configGroup.getName()
+ if (clusterConfigGroups.containsKey(configGroup.getId())) {
+ // The loadConfigGroups will load all groups to memory
+ LOG.debug("Config group already exists"
+ + ", clusterName = " + getClusterName()
+ + ", groupName = " + configGroup.getName()
+ ", groupId = " + configGroup.getId()
- + ", tag = " + configGroup.getTag());
- } else {
- clusterConfigGroups.put(configGroup.getId(), configGroup);
- configHelper.invalidateStaleConfigsCache();
- }
-
- } finally {
- writeLock.unlock();
+ + ", tag = " + configGroup.getTag());
+ } else {
+ clusterConfigGroups.put(configGroup.getId(), configGroup);
+ configHelper.invalidateStaleConfigsCache();
}
} finally {
clusterGlobalLock.writeLock().unlock();
@@ -437,12 +424,7 @@ public class ClusterImpl implements Cluster {
loadConfigGroups();
clusterGlobalLock.readLock().lock();
try {
- readLock.lock();
- try {
- return Collections.unmodifiableMap(clusterConfigGroups);
- } finally {
- readLock.unlock();
- }
+ return Collections.unmodifiableMap(clusterConfigGroups);
} finally {
clusterGlobalLock.readLock().unlock();
}
@@ -456,23 +438,18 @@ public class ClusterImpl implements Cluster {
clusterGlobalLock.readLock().lock();
try {
- readLock.lock();
- try {
- Set<ConfigGroupHostMapping> hostMappingEntities = configGroupHostMappingDAO.findByHost(hostname);
-
- if (hostMappingEntities != null && !hostMappingEntities.isEmpty()) {
- for (ConfigGroupHostMapping entity : hostMappingEntities) {
- ConfigGroup configGroup = configGroupMap.get(entity.getConfigGroupId());
- if (configGroup != null && !configGroups.containsKey(configGroup.getId())) {
- configGroups.put(configGroup.getId(), configGroup);
- }
+ Set<ConfigGroupHostMapping> hostMappingEntities = configGroupHostMappingDAO.findByHost(hostname);
+
+ if (hostMappingEntities != null && !hostMappingEntities.isEmpty()) {
+ for (ConfigGroupHostMapping entity : hostMappingEntities) {
+ ConfigGroup configGroup = configGroupMap.get(entity.getConfigGroupId());
+ if (configGroup != null
+ && !configGroups.containsKey(configGroup.getId())) {
+ configGroups.put(configGroup.getId(), configGroup);
}
}
- return configGroups;
-
- } finally {
- readLock.unlock();
}
+ return configGroups;
} finally {
clusterGlobalLock.readLock().unlock();
}
@@ -483,23 +460,16 @@ public class ClusterImpl implements Cluster {
loadRequestExecutions();
clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
- try {
- LOG.info("Adding a new request schedule"
- + ", clusterName = " + getClusterName()
- + ", id = " + requestExecution.getId()
- + ", description = " + requestExecution.getDescription());
+ LOG.info("Adding a new request schedule" + ", clusterName = "
+ + getClusterName() + ", id = " + requestExecution.getId()
+ + ", description = " + requestExecution.getDescription());
- if (requestExecutions.containsKey(requestExecution.getId())) {
- LOG.debug("Request schedule already exists"
- + ", clusterName = " + getClusterName()
- + ", id = " + requestExecution.getId()
+ if (requestExecutions.containsKey(requestExecution.getId())) {
+ LOG.debug("Request schedule already exists" + ", clusterName = "
+ + getClusterName() + ", id = " + requestExecution.getId()
+ ", description = " + requestExecution.getDescription());
- } else {
- requestExecutions.put(requestExecution.getId(), requestExecution);
- }
- } finally {
- writeLock.unlock();
+ } else {
+ requestExecutions.put(requestExecution.getId(), requestExecution);
}
} finally {
clusterGlobalLock.writeLock().unlock();
@@ -511,12 +481,7 @@ public class ClusterImpl implements Cluster {
loadRequestExecutions();
clusterGlobalLock.readLock().lock();
try {
- readLock.lock();
- try {
- return Collections.unmodifiableMap(requestExecutions);
- } finally {
- readLock.unlock();
- }
+ return Collections.unmodifiableMap(requestExecutions);
} finally {
clusterGlobalLock.readLock().unlock();
}
@@ -527,23 +492,17 @@ public class ClusterImpl implements Cluster {
loadRequestExecutions();
clusterGlobalLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- RequestExecution requestExecution = requestExecutions.get(id);
- if (requestExecution == null) {
- throw new AmbariException("Request schedule does not exists, " +
- "id = " + id);
- }
- LOG.info("Deleting request schedule"
- + ", clusterName = " + getClusterName()
- + ", id = " + requestExecution.getId()
+ RequestExecution requestExecution = requestExecutions.get(id);
+ if (requestExecution == null) {
+ throw new AmbariException("Request schedule does not exists, "
+ + "id = " + id);
+ }
+ LOG.info("Deleting request schedule" + ", clusterName = "
+ + getClusterName() + ", id = " + requestExecution.getId()
+ ", description = " + requestExecution.getDescription());
- requestExecution.delete();
- requestExecutions.remove(id);
- } finally {
- readWriteLock.writeLock().unlock();
- }
+ requestExecution.delete();
+ requestExecutions.remove(id);
} finally {
clusterGlobalLock.writeLock().unlock();
}
@@ -554,24 +513,17 @@ public class ClusterImpl implements Cluster {
loadConfigGroups();
clusterGlobalLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- ConfigGroup configGroup = clusterConfigGroups.get(id);
- if (configGroup == null) {
- throw new ConfigGroupNotFoundException(getClusterName(), id.toString());
- }
- LOG.debug("Deleting Config group"
- + ", clusterName = " + getClusterName()
- + ", groupName = " + configGroup.getName()
- + ", groupId = " + configGroup.getId()
- + ", tag = " + configGroup.getTag());
-
- configGroup.delete();
- clusterConfigGroups.remove(id);
- configHelper.invalidateStaleConfigsCache();
- } finally {
- readWriteLock.writeLock().unlock();
+ ConfigGroup configGroup = clusterConfigGroups.get(id);
+ if (configGroup == null) {
+ throw new ConfigGroupNotFoundException(getClusterName(), id.toString());
}
+ LOG.debug("Deleting Config group" + ", clusterName = " + getClusterName()
+ + ", groupName = " + configGroup.getName() + ", groupId = "
+ + configGroup.getId() + ", tag = " + configGroup.getTag());
+
+ configGroup.delete();
+ clusterConfigGroups.remove(id);
+ configHelper.invalidateStaleConfigsCache();
} finally {
clusterGlobalLock.writeLock().unlock();
}
@@ -582,21 +534,16 @@ public class ClusterImpl implements Cluster {
loadServiceHostComponents();
clusterGlobalLock.readLock().lock();
try {
- readLock.lock();
- try {
- if (!serviceComponentHosts.containsKey(serviceName)
- || !serviceComponentHosts.get(serviceName)
- .containsKey(serviceComponentName)
- || !serviceComponentHosts.get(serviceName).get(serviceComponentName)
- .containsKey(hostname)) {
- throw new ServiceComponentHostNotFoundException(getClusterName(), serviceName,
- serviceComponentName, hostname);
- }
- return serviceComponentHosts.get(serviceName).get(serviceComponentName)
- .get(hostname);
- } finally {
- readLock.unlock();
- }
+ if (!serviceComponentHosts.containsKey(serviceName)
+ || !serviceComponentHosts.get(serviceName).containsKey(
+ serviceComponentName)
+ || !serviceComponentHosts.get(serviceName).get(serviceComponentName).containsKey(
+ hostname)) {
+ throw new ServiceComponentHostNotFoundException(getClusterName(),
+ serviceName, serviceComponentName, hostname);
+ }
+ return serviceComponentHosts.get(serviceName).get(serviceComponentName).get(
+ hostname);
} finally {
clusterGlobalLock.readLock().unlock();
}
@@ -605,37 +552,22 @@ public class ClusterImpl implements Cluster {
@Override
public String getClusterName() {
- clusterGlobalLock.readLock().lock();
- try {
- readLock.lock();
- try {
- return clusterEntity.getClusterName();
- } finally {
- readLock.unlock();
- }
- } finally {
- clusterGlobalLock.readLock().unlock();
- }
-
+ return clusterEntity.getClusterName();
}
@Override
public void setClusterName(String clusterName) {
- clusterGlobalLock.readLock().lock();
+ clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
- try {
- String oldName = clusterEntity.getClusterName();
- clusterEntity.setClusterName(clusterName);
- clusterDAO.merge(clusterEntity); //RollbackException possibility if UNIQUE constraint violated
- clusters.updateClusterName(oldName, clusterName);
- } finally {
- writeLock.unlock();
- }
+ String oldName = clusterEntity.getClusterName();
+ clusterEntity.setClusterName(clusterName);
+
+ // RollbackException possibility if UNIQUE constraint violated
+ clusterDAO.merge(clusterEntity);
+ clusters.updateClusterName(oldName, clusterName);
} finally {
- clusterGlobalLock.readLock().unlock();
+ clusterGlobalLock.writeLock().unlock();
}
-
}
public void addServiceComponentHost(
@@ -643,72 +575,63 @@ public class ClusterImpl implements Cluster {
loadServiceHostComponents();
clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to add ServiceComponentHost to ClusterHostMap cache"
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to add ServiceComponentHost to ClusterHostMap cache"
+ ", serviceName=" + svcCompHost.getServiceName()
+ ", componentName=" + svcCompHost.getServiceComponentName()
+ ", hostname=" + svcCompHost.getHostName());
- }
+ }
- final String hostname = svcCompHost.getHostName();
- final String serviceName = svcCompHost.getServiceName();
- final String componentName = svcCompHost.getServiceComponentName();
- Set<Cluster> cs = clusters.getClustersForHost(hostname);
- boolean clusterFound = false;
- Iterator<Cluster> iter = cs.iterator();
- while (iter.hasNext()) {
- Cluster c = iter.next();
- if (c.getClusterId() == this.getClusterId()) {
- clusterFound = true;
- break;
- }
+ final String hostname = svcCompHost.getHostName();
+ final String serviceName = svcCompHost.getServiceName();
+ final String componentName = svcCompHost.getServiceComponentName();
+ Set<Cluster> cs = clusters.getClustersForHost(hostname);
+ boolean clusterFound = false;
+ Iterator<Cluster> iter = cs.iterator();
+ while (iter.hasNext()) {
+ Cluster c = iter.next();
+ if (c.getClusterId() == getClusterId()) {
+ clusterFound = true;
+ break;
}
- if (!clusterFound) {
- throw new AmbariException("Host does not belong this cluster"
- + ", hostname=" + hostname
- + ", clusterName=" + getClusterName()
+ }
+ if (!clusterFound) {
+ throw new AmbariException("Host does not belong this cluster"
+ + ", hostname=" + hostname + ", clusterName=" + getClusterName()
+ ", clusterId=" + getClusterId());
- }
+ }
- if (!serviceComponentHosts.containsKey(serviceName)) {
- serviceComponentHosts.put(serviceName,
+ if (!serviceComponentHosts.containsKey(serviceName)) {
+ serviceComponentHosts.put(serviceName,
new HashMap<String, Map<String, ServiceComponentHost>>());
- }
- if (!serviceComponentHosts.get(serviceName).containsKey(componentName)) {
- serviceComponentHosts.get(serviceName).put(componentName,
+ }
+ if (!serviceComponentHosts.get(serviceName).containsKey(componentName)) {
+ serviceComponentHosts.get(serviceName).put(componentName,
new HashMap<String, ServiceComponentHost>());
- }
+ }
- if (serviceComponentHosts.get(serviceName).get(componentName).
- containsKey(hostname)) {
- throw new AmbariException("Duplicate entry for ServiceComponentHost"
- + ", serviceName=" + serviceName
- + ", serviceComponentName" + componentName
- + ", hostname= " + hostname);
- }
+ if (serviceComponentHosts.get(serviceName).get(componentName).containsKey(
+ hostname)) {
+ throw new AmbariException("Duplicate entry for ServiceComponentHost"
+ + ", serviceName=" + serviceName + ", serviceComponentName"
+ + componentName + ", hostname= " + hostname);
+ }
- if (!serviceComponentHostsByHost.containsKey(hostname)) {
- serviceComponentHostsByHost.put(hostname,
+ if (!serviceComponentHostsByHost.containsKey(hostname)) {
+ serviceComponentHostsByHost.put(hostname,
new ArrayList<ServiceComponentHost>());
- }
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding a new ServiceComponentHost"
- + ", clusterName=" + getClusterName()
- + ", clusterId=" + getClusterId()
- + ", serviceName=" + serviceName
- + ", serviceComponentName" + componentName
- + ", hostname= " + hostname);
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding a new ServiceComponentHost" + ", clusterName="
+ + getClusterName() + ", clusterId=" + getClusterId()
+ + ", serviceName=" + serviceName + ", serviceComponentName"
+ + componentName + ", hostname= " + hostname);
+ }
- serviceComponentHosts.get(serviceName).get(componentName).put(hostname,
+ serviceComponentHosts.get(serviceName).get(componentName).put(hostname,
svcCompHost);
- serviceComponentHostsByHost.get(hostname).add(svcCompHost);
- } finally {
- writeLock.unlock();
- }
+ serviceComponentHostsByHost.get(hostname).add(svcCompHost);
} finally {
clusterGlobalLock.writeLock().unlock();
}
@@ -721,104 +644,83 @@ public class ClusterImpl implements Cluster {
loadServiceHostComponents();
clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to remove ServiceComponentHost to ClusterHostMap cache"
- + ", serviceName=" + svcCompHost.getServiceName()
- + ", componentName=" + svcCompHost.getServiceComponentName()
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to remove ServiceComponentHost to ClusterHostMap cache"
+ + ", serviceName="
+ + svcCompHost.getServiceName()
+ + ", componentName="
+ + svcCompHost.getServiceComponentName()
+ ", hostname=" + svcCompHost.getHostName());
- }
+ }
- final String hostname = svcCompHost.getHostName();
- final String serviceName = svcCompHost.getServiceName();
- final String componentName = svcCompHost.getServiceComponentName();
- Set<Cluster> cs = clusters.getClustersForHost(hostname);
- boolean clusterFound = false;
- Iterator<Cluster> iter = cs.iterator();
- while (iter.hasNext()) {
- Cluster c = iter.next();
- if (c.getClusterId() == this.getClusterId()) {
- clusterFound = true;
- break;
- }
+ final String hostname = svcCompHost.getHostName();
+ final String serviceName = svcCompHost.getServiceName();
+ final String componentName = svcCompHost.getServiceComponentName();
+ Set<Cluster> cs = clusters.getClustersForHost(hostname);
+ boolean clusterFound = false;
+ Iterator<Cluster> iter = cs.iterator();
+ while (iter.hasNext()) {
+ Cluster c = iter.next();
+ if (c.getClusterId() == getClusterId()) {
+ clusterFound = true;
+ break;
}
- if (!clusterFound) {
- throw new AmbariException("Host does not belong this cluster"
- + ", hostname=" + hostname
- + ", clusterName=" + getClusterName()
+ }
+ if (!clusterFound) {
+ throw new AmbariException("Host does not belong this cluster"
+ + ", hostname=" + hostname + ", clusterName=" + getClusterName()
+ ", clusterId=" + getClusterId());
- }
+ }
- if (!serviceComponentHosts.containsKey(serviceName)
+ if (!serviceComponentHosts.containsKey(serviceName)
|| !serviceComponentHosts.get(serviceName).containsKey(componentName)
- || !serviceComponentHosts.get(serviceName).get(componentName).
- containsKey(hostname)) {
- throw new AmbariException("Invalid entry for ServiceComponentHost"
- + ", serviceName=" + serviceName
- + ", serviceComponentName" + componentName
- + ", hostname= " + hostname);
- }
- if (!serviceComponentHostsByHost.containsKey(hostname)) {
- throw new AmbariException("Invalid host entry for ServiceComponentHost"
- + ", serviceName=" + serviceName
- + ", serviceComponentName" + componentName
- + ", hostname= " + hostname);
- }
+ || !serviceComponentHosts.get(serviceName).get(componentName).containsKey(
+ hostname)) {
+ throw new AmbariException("Invalid entry for ServiceComponentHost"
+ + ", serviceName=" + serviceName + ", serviceComponentName"
+ + componentName + ", hostname= " + hostname);
+ }
+ if (!serviceComponentHostsByHost.containsKey(hostname)) {
+ throw new AmbariException("Invalid host entry for ServiceComponentHost"
+ + ", serviceName=" + serviceName + ", serviceComponentName"
+ + componentName + ", hostname= " + hostname);
+ }
- ServiceComponentHost schToRemove = null;
- for (ServiceComponentHost sch : serviceComponentHostsByHost.get(hostname)) {
- if (sch.getServiceName().equals(serviceName)
+ ServiceComponentHost schToRemove = null;
+ for (ServiceComponentHost sch : serviceComponentHostsByHost.get(hostname)) {
+ if (sch.getServiceName().equals(serviceName)
&& sch.getServiceComponentName().equals(componentName)
&& sch.getHostName().equals(hostname)) {
- schToRemove = sch;
- break;
- }
+ schToRemove = sch;
+ break;
}
+ }
- if (schToRemove == null) {
- LOG.warn("Unavailable in per host cache. ServiceComponentHost"
- + ", serviceName=" + serviceName
- + ", serviceComponentName" + componentName
- + ", hostname= " + hostname);
- }
+ if (schToRemove == null) {
+ LOG.warn("Unavailable in per host cache. ServiceComponentHost"
+ + ", serviceName=" + serviceName + ", serviceComponentName"
+ + componentName + ", hostname= " + hostname);
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Removing a ServiceComponentHost"
- + ", clusterName=" + getClusterName()
- + ", clusterId=" + getClusterId()
- + ", serviceName=" + serviceName
- + ", serviceComponentName" + componentName
- + ", hostname= " + hostname);
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing a ServiceComponentHost" + ", clusterName="
+ + getClusterName() + ", clusterId=" + getClusterId()
+ + ", serviceName=" + serviceName + ", serviceComponentName"
+ + componentName + ", hostname= " + hostname);
+ }
- serviceComponentHosts.get(serviceName).get(componentName).remove(hostname);
- if (schToRemove != null) {
- serviceComponentHostsByHost.get(hostname).remove(schToRemove);
- }
- } finally {
- writeLock.unlock();
+ serviceComponentHosts.get(serviceName).get(componentName).remove(hostname);
+ if (schToRemove != null) {
+ serviceComponentHostsByHost.get(hostname).remove(schToRemove);
}
} finally {
clusterGlobalLock.writeLock().unlock();
}
-
}
@Override
public long getClusterId() {
- clusterGlobalLock.readLock().lock();
- try {
- readLock.lock();
- try {
- return clusterEntity.getClusterId();
- } finally {
- readLock.unlock();
- }
- } finally {
- clusterGlobalLock.readLock().unlock();
- }
-
+ return clusterEntity.getClusterId();
}
@Override
@@ -827,19 +729,14 @@ public class ClusterImpl implements Cluster {
loadServiceHostComponents();
clusterGlobalLock.readLock().lock();
try {
- readLock.lock();
- try {
- if (serviceComponentHostsByHost.containsKey(hostname)) {
- return new CopyOnWriteArrayList<ServiceComponentHost>(serviceComponentHostsByHost.get(hostname));
- }
- return new ArrayList<ServiceComponentHost>();
- } finally {
- readLock.unlock();
+ if (serviceComponentHostsByHost.containsKey(hostname)) {
+ return new CopyOnWriteArrayList<ServiceComponentHost>(
+ serviceComponentHostsByHost.get(hostname));
}
+ return new ArrayList<ServiceComponentHost>();
} finally {
clusterGlobalLock.readLock().unlock();
}
-
}
@Override
@@ -848,28 +745,20 @@ public class ClusterImpl implements Cluster {
loadServices();
clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding a new Service"
- + ", clusterName=" + getClusterName()
- + ", clusterId=" + getClusterId()
- + ", serviceName=" + service.getName());
- }
- if (services.containsKey(service.getName())) {
- throw new AmbariException("Service already exists"
- + ", clusterName=" + getClusterName()
- + ", clusterId=" + getClusterId()
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding a new Service" + ", clusterName=" + getClusterName()
+ + ", clusterId=" + getClusterId() + ", serviceName="
+ + service.getName());
+ }
+ if (services.containsKey(service.getName())) {
+ throw new AmbariException("Service already exists" + ", clusterName="
+ + getClusterName() + ", clusterId=" + getClusterId()
+ ", serviceName=" + service.getName());
- }
- this.services.put(service.getName(), service);
- } finally {
- writeLock.unlock();
}
+ services.put(service.getName(), service);
} finally {
clusterGlobalLock.writeLock().unlock();
}
-
}
@Override
@@ -877,30 +766,21 @@ public class ClusterImpl implements Cluster {
loadServices();
clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding a new Service"
- + ", clusterName=" + getClusterName()
- + ", clusterId=" + getClusterId()
- + ", serviceName=" + serviceName);
- }
- if (services.containsKey(serviceName)) {
- throw new AmbariException("Service already exists"
- + ", clusterName=" + getClusterName()
- + ", clusterId=" + getClusterId()
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding a new Service" + ", clusterName=" + getClusterName()
+ + ", clusterId=" + getClusterId() + ", serviceName=" + serviceName);
+ }
+ if (services.containsKey(serviceName)) {
+ throw new AmbariException("Service already exists" + ", clusterName="
+ + getClusterName() + ", clusterId=" + getClusterId()
+ ", serviceName=" + serviceName);
- }
- Service s = serviceFactory.createNew(this, serviceName);
- this.services.put(s.getName(), s);
- return s;
- } finally {
- writeLock.unlock();
}
+ Service s = serviceFactory.createNew(this, serviceName);
+ services.put(s.getName(), s);
+ return s;
} finally {
clusterGlobalLock.writeLock().unlock();
}
-
}
@Override
@@ -909,19 +789,13 @@ public class ClusterImpl implements Cluster {
loadServices();
clusterGlobalLock.readLock().lock();
try {
- readLock.lock();
- try {
- if (!services.containsKey(serviceName)) {
- throw new ServiceNotFoundException(getClusterName(), serviceName);
- }
- return services.get(serviceName);
- } finally {
- readLock.unlock();
+ if (!services.containsKey(serviceName)) {
+ throw new ServiceNotFoundException(getClusterName(), serviceName);
}
+ return services.get(serviceName);
} finally {
clusterGlobalLock.readLock().unlock();
}
-
}
@Override
@@ -929,238 +803,175 @@ public class ClusterImpl implements Cluster {
loadServices();
clusterGlobalLock.readLock().lock();
try {
- readLock.lock();
- try {
- return new HashMap<String, Service>(services);
- } finally {
- readLock.unlock();
- }
+ return new HashMap<String, Service>(services);
} finally {
clusterGlobalLock.readLock().unlock();
}
-
}
@Override
public StackId getDesiredStackVersion() {
clusterGlobalLock.readLock().lock();
try {
- readLock.lock();
- try {
- return desiredStackVersion;
- } finally {
- readLock.unlock();
- }
+ return desiredStackVersion;
} finally {
clusterGlobalLock.readLock().unlock();
}
-
}
@Override
public void setDesiredStackVersion(StackId stackVersion) throws AmbariException {
- clusterGlobalLock.readLock().lock();
+ clusterGlobalLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Changing DesiredStackVersion of Cluster"
- + ", clusterName=" + getClusterName()
- + ", clusterId=" + getClusterId()
- + ", currentDesiredStackVersion=" + this.desiredStackVersion
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Changing DesiredStackVersion of Cluster" + ", clusterName="
+ + getClusterName() + ", clusterId=" + getClusterId()
+ + ", currentDesiredStackVersion=" + desiredStackVersion
+ ", newDesiredStackVersion=" + stackVersion);
- }
- this.desiredStackVersion = stackVersion;
- clusterEntity.setDesiredStackVersion(gson.toJson(stackVersion));
- clusterDAO.merge(clusterEntity);
- loadServiceConfigTypes();
- } finally {
- readWriteLock.writeLock().unlock();
}
+ desiredStackVersion = stackVersion;
+ clusterEntity.setDesiredStackVersion(gson.toJson(stackVersion));
+ clusterDAO.merge(clusterEntity);
+ loadServiceConfigTypes();
} finally {
- clusterGlobalLock.readLock().unlock();
+ clusterGlobalLock.writeLock().unlock();
}
-
}
@Override
public StackId getCurrentStackVersion() {
clusterGlobalLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- ClusterStateEntity clusterStateEntity = clusterEntity.getClusterStateEntity();
- if (clusterStateEntity != null) {
- String stackVersion = clusterStateEntity.getCurrentStackVersion();
- if (stackVersion != null && !stackVersion.isEmpty()) {
- return gson.fromJson(stackVersion, StackId.class);
- }
+ ClusterStateEntity clusterStateEntity = clusterEntity.getClusterStateEntity();
+ if (clusterStateEntity != null) {
+ String stackVersion = clusterStateEntity.getCurrentStackVersion();
+ if (stackVersion != null && !stackVersion.isEmpty()) {
+ return gson.fromJson(stackVersion, StackId.class);
}
- return null;
- } finally {
- readWriteLock.readLock().unlock();
}
+ return null;
} finally {
clusterGlobalLock.readLock().unlock();
}
}
-
+
@Override
- public State getProvisioningState() {
+ public State getProvisioningState() {
clusterGlobalLock.readLock().lock();
+ State provisioningState = null;
try {
- readLock.lock();
- State provisioningState = null;
- try {
- provisioningState = clusterEntity.getProvisioningState();
-
- if( null == provisioningState )
- provisioningState = State.INIT;
-
- return provisioningState;
- } finally {
- readLock.unlock();
+ provisioningState = clusterEntity.getProvisioningState();
+
+ if (null == provisioningState) {
+ provisioningState = State.INIT;
}
+
+ return provisioningState;
} finally {
clusterGlobalLock.readLock().unlock();
}
- }
+ }
@Override
public void setProvisioningState(State provisioningState) {
- clusterGlobalLock.readLock().lock();
+ clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
- try {
- clusterEntity.setProvisioningState(provisioningState);
- clusterDAO.merge(clusterEntity);
- } finally {
- writeLock.unlock();
- }
+ clusterEntity.setProvisioningState(provisioningState);
+ clusterDAO.merge(clusterEntity);
} finally {
- clusterGlobalLock.readLock().unlock();
+ clusterGlobalLock.writeLock().unlock();
}
}
@Override
public void setCurrentStackVersion(StackId stackVersion)
throws AmbariException {
- clusterGlobalLock.readLock().lock();
+ clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
- try {
- ClusterStateEntity clusterStateEntity = clusterStateDAO.findByPK(clusterEntity.getClusterId());
- if (clusterStateEntity == null) {
- clusterStateEntity = new ClusterStateEntity();
- clusterStateEntity.setClusterId(clusterEntity.getClusterId());
- clusterStateEntity.setCurrentStackVersion(gson.toJson(stackVersion));
- clusterStateEntity.setClusterEntity(clusterEntity);
- clusterStateDAO.create(clusterStateEntity);
- clusterStateEntity = clusterStateDAO.merge(clusterStateEntity);
- clusterEntity.setClusterStateEntity(clusterStateEntity);
- clusterEntity = clusterDAO.merge(clusterEntity);
- } else {
- clusterStateEntity.setCurrentStackVersion(gson.toJson(stackVersion));
- clusterStateDAO.merge(clusterStateEntity);
- clusterEntity = clusterDAO.merge(clusterEntity);
- }
- } catch (RollbackException e) {
- LOG.warn("Unable to set version " + stackVersion + " for cluster " + getClusterName());
- throw new AmbariException("Unable to set"
- + " version=" + stackVersion
+ ClusterStateEntity clusterStateEntity = clusterStateDAO.findByPK(clusterEntity.getClusterId());
+ if (clusterStateEntity == null) {
+ clusterStateEntity = new ClusterStateEntity();
+ clusterStateEntity.setClusterId(clusterEntity.getClusterId());
+ clusterStateEntity.setCurrentStackVersion(gson.toJson(stackVersion));
+ clusterStateEntity.setClusterEntity(clusterEntity);
+ clusterStateDAO.create(clusterStateEntity);
+ clusterStateEntity = clusterStateDAO.merge(clusterStateEntity);
+ clusterEntity.setClusterStateEntity(clusterStateEntity);
+ clusterEntity = clusterDAO.merge(clusterEntity);
+ } else {
+ clusterStateEntity.setCurrentStackVersion(gson.toJson(stackVersion));
+ clusterStateDAO.merge(clusterStateEntity);
+ clusterEntity = clusterDAO.merge(clusterEntity);
+ }
+ } catch (RollbackException e) {
+ LOG.warn("Unable to set version " + stackVersion + " for cluster "
+ + getClusterName());
+ throw new AmbariException("Unable to set" + " version=" + stackVersion
+ " for cluster " + getClusterName(), e);
- } finally {
- writeLock.unlock();
- }
} finally {
- clusterGlobalLock.readLock().unlock();
+ clusterGlobalLock.writeLock().unlock();
}
-
}
@Override
public Map<String, Config> getConfigsByType(String configType) {
clusterGlobalLock.readLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- if (!allConfigs.containsKey(configType))
- return null;
-
- return Collections.unmodifiableMap(allConfigs.get(configType));
- } finally {
- readWriteLock.writeLock().unlock();
+ if (!allConfigs.containsKey(configType)) {
+ return null;
}
+
+ return Collections.unmodifiableMap(allConfigs.get(configType));
} finally {
clusterGlobalLock.readLock().unlock();
}
-
}
@Override
public Config getConfig(String configType, String versionTag) {
clusterGlobalLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- if (!allConfigs.containsKey(configType)
+ if (!allConfigs.containsKey(configType)
|| !allConfigs.get(configType).containsKey(versionTag)) {
- return null;
- }
- return allConfigs.get(configType).get(versionTag);
- } finally {
- readWriteLock.readLock().unlock();
+ return null;
}
+ return allConfigs.get(configType).get(versionTag);
} finally {
clusterGlobalLock.readLock().unlock();
}
-
}
@Override
public void addConfig(Config config) {
- clusterGlobalLock.readLock().lock();
+ clusterGlobalLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- if (config.getType() == null
- || config.getType().isEmpty()) {
- throw new IllegalArgumentException("Config type cannot be empty");
- }
- if (!allConfigs.containsKey(config.getType())) {
- allConfigs.put(config.getType(), new HashMap<String, Config>());
- }
-
- allConfigs.get(config.getType()).put(config.getTag(), config);
- } finally {
- readWriteLock.writeLock().unlock();
+ if (config.getType() == null || config.getType().isEmpty()) {
+ throw new IllegalArgumentException("Config type cannot be empty");
+ }
+ if (!allConfigs.containsKey(config.getType())) {
+ allConfigs.put(config.getType(), new HashMap<String, Config>());
}
+
+ allConfigs.get(config.getType()).put(config.getTag(), config);
} finally {
- clusterGlobalLock.readLock().unlock();
+ clusterGlobalLock.writeLock().unlock();
}
-
}
@Override
public Collection<Config> getAllConfigs() {
clusterGlobalLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- List<Config> list = new ArrayList<Config>();
- for (Entry<String, Map<String, Config>> entry : allConfigs.entrySet()) {
- for (Config config : entry.getValue().values()) {
- list.add(config);
- }
+ List<Config> list = new ArrayList<Config>();
+ for (Entry<String, Map<String, Config>> entry : allConfigs.entrySet()) {
+ for (Config config : entry.getValue().values()) {
+ list.add(config);
}
- return Collections.unmodifiableList(list);
- } finally {
- readWriteLock.readLock().unlock();
}
+ return Collections.unmodifiableList(list);
} finally {
clusterGlobalLock.readLock().unlock();
}
-
}
@Override
@@ -1168,20 +979,14 @@ public class ClusterImpl implements Cluster {
throws AmbariException {
clusterGlobalLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- Map<String, Host> hosts = clusters.getHostsForCluster(getClusterName());
+ Map<String, Host> hosts = clusters.getHostsForCluster(getClusterName());
- return new ClusterResponse(getClusterId(),
- getClusterName(), getProvisioningState(), hosts.keySet(), hosts.size(),
+ return new ClusterResponse(getClusterId(), getClusterName(),
+ getProvisioningState(), hosts.keySet(), hosts.size(),
getDesiredStackVersion().getStackId(), getClusterHealthReport());
- } finally {
- readWriteLock.readLock().unlock();
- }
} finally {
clusterGlobalLock.readLock().unlock();
}
-
}
@Override
@@ -1189,48 +994,36 @@ public class ClusterImpl implements Cluster {
loadServices();
clusterGlobalLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- sb.append("Cluster={ clusterName=").append(getClusterName())
- .append(", clusterId=").append(getClusterId())
- .append(", desiredStackVersion=").append(desiredStackVersion.getStackId())
- .append(", services=[ ");
- boolean first = true;
- for (Service s : services.values()) {
- if (!first) {
- sb.append(" , ");
- }
- first = false;
- sb.append("\n ");
- s.debugDump(sb);
- sb.append(' ');
- }
- sb.append(" ] }");
- } finally {
- readWriteLock.readLock().unlock();
- }
+ sb.append("Cluster={ clusterName=").append(getClusterName())
+ .append(", clusterId=").append(getClusterId())
+ .append(", desiredStackVersion=").append(desiredStackVersion.getStackId())
+ .append(", services=[ ");
+ boolean first = true;
+ for (Service s : services.values()) {
+ if (!first) {
+ sb.append(" , ");
+ }
+ first = false;
+ sb.append("\n ");
+ s.debugDump(sb);
+ sb.append(' ');
+ }
+ sb.append(" ] }");
} finally {
clusterGlobalLock.readLock().unlock();
}
-
}
@Override
@Transactional
public void refresh() {
- clusterGlobalLock.readLock().lock();
+ clusterGlobalLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- clusterEntity = clusterDAO.findById(clusterEntity.getClusterId());
- clusterDAO.refresh(clusterEntity);
- } finally {
- readWriteLock.writeLock().unlock();
- }
+ clusterEntity = clusterDAO.findById(clusterEntity.getClusterId());
+ clusterDAO.refresh(clusterEntity);
} finally {
- clusterGlobalLock.readLock().unlock();
+ clusterGlobalLock.writeLock().unlock();
}
-
}
@Override
@@ -1239,31 +1032,25 @@ public class ClusterImpl implements Cluster {
loadServices();
clusterGlobalLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- LOG.info("Deleting all services for cluster"
- + ", clusterName=" + getClusterName());
- for (Service service : services.values()) {
- if (!service.canBeRemoved()) {
- throw new AmbariException("Found non removable service when trying to"
- + " all services from cluster"
- + ", clusterName=" + getClusterName()
- + ", serviceName=" + service.getName());
- }
- }
-
- for (Service service : services.values()) {
- service.delete();
+ LOG.info("Deleting all services for cluster" + ", clusterName="
+ + getClusterName());
+ for (Service service : services.values()) {
+ if (!service.canBeRemoved()) {
+ throw new AmbariException(
+ "Found non removable service when trying to"
+ + " all services from cluster" + ", clusterName="
+ + getClusterName() + ", serviceName=" + service.getName());
}
+ }
- services.clear();
- } finally {
- readWriteLock.writeLock().unlock();
+ for (Service service : services.values()) {
+ service.delete();
}
+
+ services.clear();
} finally {
clusterGlobalLock.writeLock().unlock();
}
-
}
@Override
@@ -1272,27 +1059,20 @@ public class ClusterImpl implements Cluster {
loadServices();
clusterGlobalLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- Service service = getService(serviceName);
- LOG.info("Deleting service for cluster"
+ Service service = getService(serviceName);
+ LOG.info("Deleting service for cluster" + ", clusterName="
+ + getClusterName() + ", serviceName=" + service.getName());
+ // FIXME check dependencies from meta layer
+ if (!service.canBeRemoved()) {
+ throw new AmbariException("Could not delete service from cluster"
+ ", clusterName=" + getClusterName()
+ ", serviceName=" + service.getName());
- // FIXME check dependencies from meta layer
- if (!service.canBeRemoved()) {
- throw new AmbariException("Could not delete service from cluster"
- + ", clusterName=" + getClusterName()
- + ", serviceName=" + service.getName());
- }
- service.delete();
- services.remove(serviceName);
- } finally {
- readWriteLock.writeLock().unlock();
}
+ service.delete();
+ services.remove(serviceName);
} finally {
clusterGlobalLock.writeLock().unlock();
}
-
}
@Override
@@ -1300,21 +1080,15 @@ public class ClusterImpl implements Cluster {
loadServices();
clusterGlobalLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- boolean safeToRemove = true;
- for (Service service : services.values()) {
- if (!service.canBeRemoved()) {
- safeToRemove = false;
- LOG.warn("Found non removable service"
- + ", clusterName=" + getClusterName()
- + ", serviceName=" + service.getName());
- }
+ boolean safeToRemove = true;
+ for (Service service : services.values()) {
+ if (!service.canBeRemoved()) {
+ safeToRemove = false;
+ LOG.warn("Found non removable service" + ", clusterName="
+ + getClusterName() + ", serviceName=" + service.getName());
}
- return safeToRemove;
- } finally {
- readWriteLock.readLock().unlock();
}
+ return safeToRemove;
} finally {
clusterGlobalLock.readLock().unlock();
}
@@ -1325,19 +1099,13 @@ public class ClusterImpl implements Cluster {
public void delete() throws AmbariException {
clusterGlobalLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- refresh();
- deleteAllServices();
- removeEntities();
- allConfigs.clear();
- } finally {
- readWriteLock.writeLock().unlock();
- }
+ refresh();
+ deleteAllServices();
+ removeEntities();
+ allConfigs.clear();
} finally {
clusterGlobalLock.writeLock().unlock();
}
-
}
@Transactional
@@ -1352,43 +1120,40 @@ public class ClusterImpl implements Cluster {
@Override
public ServiceConfigVersionResponse addDesiredConfig(String user, Set<Config> configs, String serviceConfigVersionNote) {
- if (null == user)
+ if (null == user) {
throw new NullPointerException("User must be specified.");
+ }
- clusterGlobalLock.readLock().lock();
+ clusterGlobalLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- if (configs == null) {
- return null;
- }
+ if (configs == null) {
+ return null;
+ }
- Iterator<Config> configIterator = configs.iterator();
+ Iterator<Config> configIterator = configs.iterator();
- while (configIterator.hasNext()) {
- Config config = configIterator.next();
- if (config == null) {
- configIterator.remove();
- continue;
- }
- Config currentDesired = getDesiredConfigByType(config.getType());
+ while (configIterator.hasNext()) {
+ Config config = configIterator.next();
+ if (config == null) {
+ configIterator.remove();
+ continue;
+ }
+ Config currentDesired = getDesiredConfigByType(config.getType());
- // do not set if it is already the current
- if (null != currentDesired && currentDesired.getTag().equals(config.getTag())) {
- configIterator.remove();
- }
+ // do not set if it is already the current
+ if (null != currentDesired
+ && currentDesired.getTag().equals(config.getTag())) {
+ configIterator.remove();
}
+ }
- ServiceConfigVersionResponse serviceConfigVersionResponse =
- applyConfigs(configs, user, serviceConfigVersionNote);
+ ServiceConfigVersionResponse serviceConfigVersionResponse = applyConfigs(
+ configs, user, serviceConfigVersionNote);
- configHelper.invalidateStaleConfigsCache();
- return serviceConfigVersionResponse;
- } finally {
- readWriteLock.writeLock().unlock();
- }
+ configHelper.invalidateStaleConfigsCache();
+ return serviceConfigVersionResponse;
} finally {
- clusterGlobalLock.readLock().unlock();
+ clusterGlobalLock.writeLock().unlock();
}
}
@@ -1396,42 +1161,37 @@ public class ClusterImpl implements Cluster {
public Map<String, DesiredConfig> getDesiredConfigs() {
clusterGlobalLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- Map<String, DesiredConfig> map = new HashMap<String, DesiredConfig>();
- Collection<String> types = new HashSet<String>();
-
- for (ClusterConfigMappingEntity e : clusterEntity.getConfigMappingEntities()) {
- if (e.isSelected() > 0) {
- DesiredConfig c = new DesiredConfig();
- c.setServiceName(null);
- c.setTag(e.getTag());
- c.setUser(e.getUser());
- c.setVersion(allConfigs.get(e.getType()).get(e.getTag()).getVersion());
-
- map.put(e.getType(), c);
- types.add(e.getType());
- }
+ Map<String, DesiredConfig> map = new HashMap<String, DesiredConfig>();
+ Collection<String> types = new HashSet<String>();
+
+ for (ClusterConfigMappingEntity e : clusterEntity.getConfigMappingEntities()) {
+ if (e.isSelected() > 0) {
+ DesiredConfig c = new DesiredConfig();
+ c.setServiceName(null);
+ c.setTag(e.getTag());
+ c.setUser(e.getUser());
+ c.setVersion(allConfigs.get(e.getType()).get(e.getTag()).getVersion());
+
+ map.put(e.getType(), c);
+ types.add(e.getType());
}
+ }
- if (!map.isEmpty()) {
- Map<String, List<HostConfigMapping>> hostMappingsByType =
- hostConfigMappingDAO.findSelectedHostsByTypes(clusterEntity.getClusterId(), types);
+ if (!map.isEmpty()) {
+ Map<String, List<HostConfigMapping>> hostMappingsByType = hostConfigMappingDAO.findSelectedHostsByTypes(
+ clusterEntity.getClusterId(), types);
- for (Entry<String, DesiredConfig> entry : map.entrySet()) {
- List<DesiredConfig.HostOverride> hostOverrides = new ArrayList<DesiredConfig.HostOverride>();
- for (HostConfigMapping mappingEntity : hostMappingsByType.get(entry.getKey())) {
- hostOverrides.add(new DesiredConfig.HostOverride(mappingEntity.getHostName(),
- mappingEntity.getVersion()));
- }
- entry.getValue().setHostOverrides(hostOverrides);
+ for (Entry<String, DesiredConfig> entry : map.entrySet()) {
+ List<DesiredConfig.HostOverride> hostOverrides = new ArrayList<DesiredConfig.HostOverride>();
+ for (HostConfigMapping mappingEntity : hostMappingsByType.get(entry.getKey())) {
+ hostOverrides.add(new DesiredConfig.HostOverride(
+ mappingEntity.getHostName(), mappingEntity.getVersion()));
}
+ entry.getValue().setHostOverrides(hostOverrides);
}
-
- return map;
- } finally {
- readWriteLock.readLock().unlock();
}
+
+ return map;
} finally {
clusterGlobalLock.readLock().unlock();
}
@@ -1525,19 +1285,16 @@ public class ClusterImpl implements Cluster {
@Override
public ServiceConfigVersionResponse setServiceConfigVersion(String serviceName, Long version, String user, String note) throws AmbariException {
- if (null == user)
+ if (null == user) {
throw new NullPointerException("User must be specified.");
+ }
clusterGlobalLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- ServiceConfigVersionResponse serviceConfigVersionResponse = applyServiceConfigVersion(serviceName, version, user, note);
- configHelper.invalidateStaleConfigsCache();
- return serviceConfigVersionResponse;
- } finally {
- readWriteLock.writeLock().unlock();
- }
+ ServiceConfigVersionResponse serviceConfigVersionResponse = applyServiceConfigVersion(
+ serviceName, version, user, note);
+ configHelper.invalidateStaleConfigsCache();
+ return serviceConfigVersionResponse;
} finally {
clusterGlobalLock.writeLock().unlock();
}
@@ -1547,21 +1304,17 @@ public class ClusterImpl implements Cluster {
public Map<String, Collection<ServiceConfigVersionResponse>> getActiveServiceConfigVersions() {
clusterGlobalLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- Map<String, Collection<ServiceConfigVersionResponse>> map = new HashMap<String, Collection<ServiceConfigVersionResponse>>();
+ Map<String, Collection<ServiceConfigVersionResponse>> map = new HashMap<String, Collection<ServiceConfigVersionResponse>>();
- Set<ServiceConfigVersionResponse> responses = getActiveServiceConfigVersionSet();
- for (ServiceConfigVersionResponse response : responses) {
- if (map.get(response.getServiceName()) == null) {
- map.put(response.getServiceName(), new ArrayList<ServiceConfigVersionResponse>());
- }
- map.get(response.getServiceName()).add(response);
+ Set<ServiceConfigVersionResponse> responses = getActiveServiceConfigVersionSet();
+ for (ServiceConfigVersionResponse response : responses) {
+ if (map.get(response.getServiceName()) == null) {
+ map.put(response.getServiceName(),
+ new ArrayList<ServiceConfigVersionResponse>());
}
- return map;
- } finally {
- readWriteLock.readLock().unlock();
+ map.get(response.getServiceName()).add(response);
}
+ return map;
} finally {
clusterGlobalLock.readLock().unlock();
}
@@ -1572,34 +1325,30 @@ public class ClusterImpl implements Cluster {
public List<ServiceConfigVersionResponse> getServiceConfigVersions() {
clusterGlobalLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- List<ServiceConfigVersionResponse> serviceConfigVersionResponses = new ArrayList<ServiceConfigVersionResponse>();
- Set<Long> activeIds = getActiveServiceConfigVersionIds();
-
- for (ServiceConfigEntity serviceConfigEntity : serviceConfigDAO.getServiceConfigs(getClusterId())) {
- ServiceConfigVersionResponse serviceConfigVersionResponse =
- convertToServiceConfigVersionResponse(serviceConfigEntity);
-
- serviceConfigVersionResponse.setHosts(serviceConfigEntity.getHostNames());
- serviceConfigVersionResponse.setConfigurations(new ArrayList<ConfigurationResponse>());
- serviceConfigVersionResponse.setIsCurrent(activeIds.contains(serviceConfigEntity.getServiceConfigId()));
-
- List<ClusterConfigEntity> clusterConfigEntities = serviceConfigEntity.getClusterConfigEntities();
- for (ClusterConfigEntity clusterConfigEntity : clusterConfigEntities) {
- Config config = allConfigs.get(clusterConfigEntity.getType()).get(clusterConfigEntity.getTag());
- serviceConfigVersionResponse.getConfigurations().add(new ConfigurationResponse(getClusterName(),
- config.getType(), config.getTag(), config.getVersion(), config.getProperties(),
- config.getPropertiesAttributes()));
- }
+ List<ServiceConfigVersionResponse> serviceConfigVersionResponses = new ArrayList<ServiceConfigVersionResponse>();
+ Set<Long> activeIds = getActiveServiceConfigVersionIds();
+
+ for (ServiceConfigEntity serviceConfigEntity : serviceConfigDAO.getServiceConfigs(getClusterId())) {
+ ServiceConfigVersionResponse serviceConfigVersionResponse = convertToServiceConfigVersionResponse(serviceConfigEntity);
+
+ serviceConfigVersionResponse.setHosts(serviceConfigEntity.getHostNames());
+ serviceConfigVersionResponse.setConfigurations(new ArrayList<ConfigurationResponse>());
+ serviceConfigVersionResponse.setIsCurrent(activeIds.contains(serviceConfigEntity.getServiceConfigId()));
- serviceConfigVersionResponses.add(serviceConfigVersionResponse);
+ List<ClusterConfigEntity> clusterConfigEntities = serviceConfigEntity.getClusterConfigEntities();
+ for (ClusterConfigEntity clusterConfigEntity : clusterConfigEntities) {
+ Config config = allConfigs.get(clusterConfigEntity.getType()).get(
+ clusterConfigEntity.getTag());
+ serviceConfigVersionResponse.getConfigurations().add(
+ new ConfigurationResponse(getClusterName(), config.getType(),
+ config.getTag(), config.getVersion(), config.getProperties(),
+ config.getPropertiesAttributes()));
}
- return serviceConfigVersionResponses;
- } finally {
- readWriteLock.readLock().unlock();
+ serviceConfigVersionResponses.add(serviceConfigVersionResponse);
}
+
+ return serviceConfigVersionResponses;
} finally {
clusterGlobalLock.readLock().unlock();
}
@@ -1656,7 +1405,7 @@ public class ClusterImpl implements Cluster {
serviceConfigVersionResponse.setClusterName(getClusterName());
serviceConfigVersionResponse.setServiceName(serviceConfigEntity.getServiceName());
serviceConfigVersionResponse.setVersion(serviceConfigEntity.getVersion());
- serviceConfigVersionResponse.setCreateTime(serviceConfigEntity.getCreateTimestamp());
+ serviceConfigVersionResponse.setCreateTime(serviceConfigEntity.getCreateTimestamp());
serviceConfigVersionResponse.setUserName(serviceConfigEntity.getUser());
serviceConfigVersionResponse.setNote(serviceConfigEntity.getNote());
@@ -1832,18 +1581,13 @@ public class ClusterImpl implements Cluster {
public Config getDesiredConfigByType(String configType) {
clusterGlobalLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- for (ClusterConfigMappingEntity e : clusterEntity.getConfigMappingEntities()) {
- if (e.isSelected() > 0 && e.getType().equals(configType)) {
- return getConfig(e.getType(), e.getTag());
- }
+ for (ClusterConfigMappingEntity e : clusterEntity.getConfigMappingEntities()) {
+ if (e.isSelected() > 0 && e.getType().equals(configType)) {
+ return getConfig(e.getType(), e.getTag());
}
-
- return null;
- } finally {
- readWriteLock.readLock().unlock();
}
+
+ return null;
} finally {
clusterGlobalLock.readLock().unlock();
}
@@ -2037,12 +1781,11 @@ public class ClusterImpl implements Cluster {
return chr;
}
-
+
@Override
public void addAlerts(Collection<Alert> alerts) {
+ clusterGlobalLock.writeLock().lock();
try {
- writeLock.lock();
-
for (final Alert alert : alerts) {
if (clusterAlerts.size() > 0) {
CollectionUtils.filter(clusterAlerts, new Predicate() {
@@ -2053,7 +1796,7 @@ public class ClusterImpl implements Cluster {
}
});
}
-
+
if (LOG.isDebugEnabled()) {
LOG.debug("Adding alert for name={} service={}, on host={}",
alert.getName(), alert.getService(), alert.getHost());
@@ -2063,18 +1806,17 @@ public class ClusterImpl implements Cluster {
clusterAlerts.addAll(alerts);
} finally {
- writeLock.unlock();
+ clusterGlobalLock.writeLock().unlock();
}
}
-
+
@Override
public Collection<Alert> getAlerts() {
+ clusterGlobalLock.readLock().lock();
try {
- readLock.lock();
-
return Collections.unmodifiableSet(clusterAlerts);
} finally {
- readLock.unlock();
+ clusterGlobalLock.readLock().unlock();
}
}
[3/3] ambari git commit: AMBARI-9368 - Deadlock Between Dependent
Cluster/Service/Component/Host Implementations (jonathanhurley)
Posted by jo...@apache.org.
AMBARI-9368 - Deadlock Between Dependent Cluster/Service/Component/Host Implementations (jonathanhurley)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/dd572d35
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/dd572d35
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/dd572d35
Branch: refs/heads/branch-1.7.0
Commit: dd572d3544eb6a36c78155ae88ac423a16922d00
Parents: 7566e57
Author: Jonathan Hurley <jh...@hortonworks.com>
Authored: Wed Mar 4 20:24:50 2015 -0500
Committer: Jonathan Hurley <jh...@hortonworks.com>
Committed: Thu Mar 5 09:24:50 2015 -0500
----------------------------------------------------------------------
.../server/state/ServiceComponentImpl.java | 302 ++---
.../apache/ambari/server/state/ServiceImpl.java | 263 ++--
.../server/state/cluster/ClusterImpl.java | 1272 +++++++-----------
.../svccomphost/ServiceComponentHostImpl.java | 652 ++++-----
.../state/cluster/ClusterDeadlockTest.java | 357 +++++
ambari-web/package.json | 2 +-
6 files changed, 1292 insertions(+), 1556 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/dd572d35/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java
index 45ea1f9..de0943b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java
@@ -18,12 +18,12 @@
package org.apache.ambari.server.state;
-import com.google.gson.Gson;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.google.inject.assistedinject.Assisted;
-import com.google.inject.assistedinject.AssistedInject;
-import com.google.inject.persist.Transactional;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.ServiceComponentHostNotFoundException;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
@@ -42,11 +42,12 @@ import org.apache.ambari.server.state.cluster.ClusterImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.AssistedInject;
+import com.google.inject.persist.Transactional;
public class ServiceComponentImpl implements ServiceComponent {
@@ -77,15 +78,15 @@ public class ServiceComponentImpl implements ServiceComponent {
public ServiceComponentImpl(@Assisted Service service,
@Assisted String componentName, Injector injector) throws AmbariException {
injector.injectMembers(this);
- this.clusterGlobalLock = service.getClusterGlobalLock();
+ clusterGlobalLock = service.getClusterGlobalLock();
this.service = service;
- this.desiredStateEntity = new ServiceComponentDesiredStateEntity();
+ desiredStateEntity = new ServiceComponentDesiredStateEntity();
desiredStateEntity.setComponentName(componentName);
desiredStateEntity.setDesiredState(State.INIT);
setDesiredStackVersion(service.getDesiredStackVersion());
- this.hostComponents = new HashMap<String, ServiceComponentHost>();
+ hostComponents = new HashMap<String, ServiceComponentHost>();
StackId stackId = service.getDesiredStackVersion();
ComponentInfo compInfo = ambariMetaInfo.getComponentCategory(
@@ -99,10 +100,8 @@ public class ServiceComponentImpl implements ServiceComponent {
+ ", componentName=" + componentName
+ ", stackInfo=" + stackId.getStackId());
}
- this.isClientComponent = compInfo.isClient();
- this.isMasterComponent = compInfo.isMaster();
-
- init();
+ isClientComponent = compInfo.isClient();
+ isMasterComponent = compInfo.isMaster();
}
@AssistedInject
@@ -110,11 +109,11 @@ public class ServiceComponentImpl implements ServiceComponent {
@Assisted ServiceComponentDesiredStateEntity serviceComponentDesiredStateEntity,
Injector injector) throws AmbariException {
injector.injectMembers(this);
- this.clusterGlobalLock = service.getClusterGlobalLock();
+ clusterGlobalLock = service.getClusterGlobalLock();
this.service = service;
- this.desiredStateEntity = serviceComponentDesiredStateEntity;
+ desiredStateEntity = serviceComponentDesiredStateEntity;
- this.hostComponents = new HashMap<String, ServiceComponentHost>();
+ hostComponents = new HashMap<String, ServiceComponentHost>();
for (HostComponentStateEntity hostComponentStateEntity : desiredStateEntity.getHostComponentStateEntities()) {
HostComponentDesiredStateEntityPK pk = new HostComponentDesiredStateEntityPK();
pk.setClusterId(hostComponentStateEntity.getClusterId());
@@ -141,17 +140,12 @@ public class ServiceComponentImpl implements ServiceComponent {
+ ", componentName=" + getName()
+ ", stackInfo=" + stackId.getStackId());
}
- this.isClientComponent = compInfo.isClient();
- this.isMasterComponent = compInfo.isMaster();
+ isClientComponent = compInfo.isClient();
+ isMasterComponent = compInfo.isMaster();
persisted = true;
}
- private void init() {
- // TODO load during restart
- // initialize from DB
- }
-
@Override
public ReadWriteLock getClusterGlobalLock() {
return clusterGlobalLock;
@@ -159,47 +153,17 @@ public class ServiceComponentImpl implements ServiceComponent {
@Override
public String getName() {
- clusterGlobalLock.readLock().lock();
- try {
- readWriteLock.readLock().lock();
- try {
- return desiredStateEntity.getComponentName();
- } finally {
- readWriteLock.readLock().unlock();
- }
- } finally {
- clusterGlobalLock.readLock().unlock();
- }
+ return desiredStateEntity.getComponentName();
}
@Override
public String getServiceName() {
- clusterGlobalLock.readLock().lock();
- try {
- readWriteLock.readLock().lock();
- try {
- return service.getName();
- } finally {
- readWriteLock.readLock().unlock();
- }
- } finally {
- clusterGlobalLock.readLock().unlock();
- }
+ return service.getName();
}
@Override
public long getClusterId() {
- clusterGlobalLock.readLock().lock();
- try {
- readWriteLock.readLock().lock();
- try {
- return this.service.getClusterId();
- } finally {
- readWriteLock.readLock().unlock();
- }
- } finally {
- clusterGlobalLock.readLock().unlock();
- }
+ return service.getClusterId();
}
@Override
@@ -271,7 +235,7 @@ public class ServiceComponentImpl implements ServiceComponent {
// FIXME need a better approach of caching components by host
ClusterImpl clusterImpl = (ClusterImpl) service.getCluster();
clusterImpl.addServiceComponentHost(hostComponent);
- this.hostComponents.put(hostComponent.getHostName(), hostComponent);
+ hostComponents.put(hostComponent.getHostName(), hostComponent);
} finally {
readWriteLock.writeLock().unlock();
}
@@ -310,7 +274,7 @@ public class ServiceComponentImpl implements ServiceComponent {
ClusterImpl clusterImpl = (ClusterImpl) service.getCluster();
clusterImpl.addServiceComponentHost(hostComponent);
- this.hostComponents.put(hostComponent.getHostName(), hostComponent);
+ hostComponents.put(hostComponent.getHostName(), hostComponent);
return hostComponent;
} finally {
@@ -332,7 +296,7 @@ public class ServiceComponentImpl implements ServiceComponent {
throw new ServiceComponentHostNotFoundException(getClusterName(),
getServiceName(), getName(), hostname);
}
- return this.hostComponents.get(hostname);
+ return hostComponents.get(hostname);
} finally {
readWriteLock.readLock().unlock();
}
@@ -343,149 +307,105 @@ public class ServiceComponentImpl implements ServiceComponent {
@Override
public State getDesiredState() {
- clusterGlobalLock.readLock().lock();
+ readWriteLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- return desiredStateEntity.getDesiredState();
- } finally {
- readWriteLock.readLock().unlock();
- }
+ return desiredStateEntity.getDesiredState();
} finally {
- clusterGlobalLock.readLock().unlock();
+ readWriteLock.readLock().unlock();
}
}
@Override
public void setDesiredState(State state) {
- clusterGlobalLock.readLock().lock();
+ readWriteLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting DesiredState of Service"
- + ", clusterName=" + service.getCluster().getClusterName()
- + ", clusterId=" + service.getCluster().getClusterId()
- + ", serviceName=" + service.getName()
- + ", serviceComponentName=" + getName()
- + ", oldDesiredState=" + getDesiredState()
- + ", newDesiredState=" + state);
- }
- desiredStateEntity.setDesiredState(state);
- saveIfPersisted();
- } finally {
- readWriteLock.writeLock().unlock();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting DesiredState of Service" + ", clusterName="
+ + service.getCluster().getClusterName() + ", clusterId="
+ + service.getCluster().getClusterId() + ", serviceName="
+ + service.getName() + ", serviceComponentName=" + getName()
+ + ", oldDesiredState=" + getDesiredState() + ", newDesiredState="
+ + state);
}
+ desiredStateEntity.setDesiredState(state);
+ saveIfPersisted();
} finally {
- clusterGlobalLock.readLock().unlock();
+ readWriteLock.writeLock().unlock();
}
}
@Override
public StackId getDesiredStackVersion() {
- clusterGlobalLock.readLock().lock();
+ readWriteLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- return gson.fromJson(desiredStateEntity.getDesiredStackVersion(), StackId.class);
- } finally {
- readWriteLock.readLock().unlock();
- }
+ return gson.fromJson(desiredStateEntity.getDesiredStackVersion(),
+ StackId.class);
} finally {
- clusterGlobalLock.readLock().unlock();
+ readWriteLock.readLock().unlock();
}
}
@Override
public void setDesiredStackVersion(StackId stackVersion) {
- clusterGlobalLock.readLock().lock();
+ readWriteLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting DesiredStackVersion of Service"
- + ", clusterName=" + service.getCluster().getClusterName()
- + ", clusterId=" + service.getCluster().getClusterId()
- + ", serviceName=" + service.getName()
- + ", serviceComponentName=" + getName()
- + ", oldDesiredStackVersion=" + getDesiredStackVersion()
- + ", newDesiredStackVersion=" + stackVersion);
- }
- desiredStateEntity.setDesiredStackVersion(gson.toJson(stackVersion));
- saveIfPersisted();
- } finally {
- readWriteLock.writeLock().unlock();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting DesiredStackVersion of Service" + ", clusterName="
+ + service.getCluster().getClusterName() + ", clusterId="
+ + service.getCluster().getClusterId() + ", serviceName="
+ + service.getName() + ", serviceComponentName=" + getName()
+ + ", oldDesiredStackVersion=" + getDesiredStackVersion()
+ + ", newDesiredStackVersion=" + stackVersion);
}
+ desiredStateEntity.setDesiredStackVersion(gson.toJson(stackVersion));
+ saveIfPersisted();
} finally {
- clusterGlobalLock.readLock().unlock();
+ readWriteLock.writeLock().unlock();
}
}
@Override
public ServiceComponentResponse convertToResponse() {
- clusterGlobalLock.readLock().lock();
+ readWriteLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- ServiceComponentResponse r = new ServiceComponentResponse(
- getClusterId(), service.getCluster().getClusterName(),
- service.getName(), getName(),
- getDesiredStackVersion().getStackId(),
- getDesiredState().toString(), getTotalCount(), getStartedCount(),
- getInstalledCount());
- return r;
- } finally {
- readWriteLock.readLock().unlock();
- }
+ ServiceComponentResponse r = new ServiceComponentResponse(getClusterId(),
+ service.getCluster().getClusterName(), service.getName(), getName(),
+ getDesiredStackVersion().getStackId(), getDesiredState().toString(),
+ getTotalCount(), getStartedCount(), getInstalledCount());
+ return r;
} finally {
- clusterGlobalLock.readLock().unlock();
+ readWriteLock.readLock().unlock();
}
}
@Override
public String getClusterName() {
- clusterGlobalLock.readLock().lock();
- try {
- readWriteLock.readLock().lock();
- try {
- return service.getCluster().getClusterName();
- } finally {
- readWriteLock.readLock().unlock();
- }
- } finally {
- clusterGlobalLock.readLock().unlock();
- }
+ return service.getCluster().getClusterName();
}
@Override
public void debugDump(StringBuilder sb) {
- clusterGlobalLock.readLock().lock();
+ readWriteLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- sb.append("ServiceComponent={ serviceComponentName=" + getName()
- + ", clusterName=" + service.getCluster().getClusterName()
- + ", clusterId=" + service.getCluster().getClusterId()
- + ", serviceName=" + service.getName()
- + ", desiredStackVersion=" + getDesiredStackVersion()
- + ", desiredState=" + getDesiredState().toString()
- + ", hostcomponents=[ ");
- boolean first = true;
- for (ServiceComponentHost sch : hostComponents.values()) {
- if (!first) {
- sb.append(" , ");
- first = false;
- }
- sb.append("\n ");
- sch.debugDump(sb);
- sb.append(" ");
+ sb.append("ServiceComponent={ serviceComponentName=" + getName()
+ + ", clusterName=" + service.getCluster().getClusterName()
+ + ", clusterId=" + service.getCluster().getClusterId()
+ + ", serviceName=" + service.getName() + ", desiredStackVersion="
+ + getDesiredStackVersion() + ", desiredState="
+ + getDesiredState().toString() + ", hostcomponents=[ ");
+ boolean first = true;
+ for (ServiceComponentHost sch : hostComponents.values()) {
+ if (!first) {
+ sb.append(" , ");
+ first = false;
}
- sb.append(" ] }");
- } finally {
- readWriteLock.readLock().unlock();
+ sb.append("\n ");
+ sch.debugDump(sb);
+ sb.append(" ");
}
+ sb.append(" ] }");
} finally {
- clusterGlobalLock.readLock().unlock();
+ readWriteLock.readLock().unlock();
}
}
@@ -506,12 +426,22 @@ public class ServiceComponentImpl implements ServiceComponent {
@Override
public void persist() {
- clusterGlobalLock.readLock().lock();
+ boolean clusterWriteLockAcquired = false;
+ if (!persisted) {
+ clusterGlobalLock.writeLock().lock();
+ clusterWriteLockAcquired = true;
+ }
+
try {
readWriteLock.writeLock().lock();
try {
if (!persisted) {
+ // persist the new cluster topology and then release the cluster lock
+ // as it has no more bearing on the rest of this persist() method
persistEntities();
+ clusterGlobalLock.writeLock().unlock();
+ clusterWriteLockAcquired = false;
+
refresh();
service.refresh();
persisted = true;
@@ -522,7 +452,9 @@ public class ServiceComponentImpl implements ServiceComponent {
readWriteLock.writeLock().unlock();
}
} finally {
- clusterGlobalLock.readLock().unlock();
+ if (clusterWriteLockAcquired) {
+ clusterGlobalLock.writeLock().unlock();
+ }
}
}
@@ -541,52 +473,42 @@ public class ServiceComponentImpl implements ServiceComponent {
@Override
@Transactional
public void refresh() {
- clusterGlobalLock.readLock().lock();
+ readWriteLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- if (isPersisted()) {
- ServiceComponentDesiredStateEntityPK pk = new ServiceComponentDesiredStateEntityPK();
- pk.setComponentName(getName());
- pk.setClusterId(getClusterId());
- pk.setServiceName(getServiceName());
- // TODO: desiredStateEntity is assigned in unway, may be a bug
- desiredStateEntity = serviceComponentDesiredStateDAO.findByPK(pk);
- serviceComponentDesiredStateDAO.refresh(desiredStateEntity);
- }
- } finally {
- readWriteLock.writeLock().unlock();
+ if (isPersisted()) {
+ ServiceComponentDesiredStateEntityPK pk = new ServiceComponentDesiredStateEntityPK();
+ pk.setComponentName(getName());
+ pk.setClusterId(getClusterId());
+ pk.setServiceName(getServiceName());
+ // TODO: desiredStateEntity is assigned in unway, may be a bug
+ desiredStateEntity = serviceComponentDesiredStateDAO.findByPK(pk);
+ serviceComponentDesiredStateDAO.refresh(desiredStateEntity);
}
} finally {
- clusterGlobalLock.readLock().unlock();
+ readWriteLock.writeLock().unlock();
}
}
@Transactional
private void saveIfPersisted() {
- clusterGlobalLock.readLock().lock();
+ readWriteLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- if (isPersisted()) {
- serviceComponentDesiredStateDAO.merge(desiredStateEntity);
- }
- } finally {
- readWriteLock.writeLock().unlock();
+ if (isPersisted()) {
+ serviceComponentDesiredStateDAO.merge(desiredStateEntity);
}
} finally {
- clusterGlobalLock.readLock().unlock();
+ readWriteLock.writeLock().unlock();
}
}
@Override
public boolean isClientComponent() {
- return this.isClientComponent;
+ return isClientComponent;
}
@Override
public boolean isMasterComponent() {
- return this.isMasterComponent;
+ return isMasterComponent;
}
@Override
http://git-wip-us.apache.org/repos/asf/ambari/blob/dd572d35/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java
index 6279c2b..b110014 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java
@@ -18,7 +18,6 @@
package org.apache.ambari.server.state;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
@@ -94,14 +93,14 @@ public class ServiceImpl implements Service {
this.cluster = cluster;
- this.components = new HashMap<String, ServiceComponent>();
+ components = new HashMap<String, ServiceComponent>();
StackId stackId = cluster.getDesiredStackVersion();
setDesiredStackVersion(stackId);
ServiceInfo sInfo = ambariMetaInfo.getServiceInfo(stackId.getStackName(),
stackId.getStackVersion(), serviceName);
- this.isClientOnlyService = sInfo.isClientOnlyService();
+ isClientOnlyService = sInfo.isClientOnlyService();
init();
}
@@ -115,9 +114,9 @@ public class ServiceImpl implements Service {
this.cluster = cluster;
//TODO check for null states?
- this.serviceDesiredStateEntity = serviceEntity.getServiceDesiredStateEntity();
+ serviceDesiredStateEntity = serviceEntity.getServiceDesiredStateEntity();
- this.components = new HashMap<String, ServiceComponent>();
+ components = new HashMap<String, ServiceComponent>();
if (!serviceEntity.getServiceComponentDesiredStateEntities().isEmpty()) {
for (ServiceComponentDesiredStateEntity serviceComponentDesiredStateEntity
@@ -139,7 +138,7 @@ public class ServiceImpl implements Service {
StackId stackId = getDesiredStackVersion();
ServiceInfo sInfo = ambariMetaInfo.getServiceInfo(stackId.getStackName(),
stackId.getStackVersion(), getName());
- this.isClientOnlyService = sInfo.isClientOnlyService();
+ isClientOnlyService = sInfo.isClientOnlyService();
persisted = true;
}
@@ -151,46 +150,21 @@ public class ServiceImpl implements Service {
@Override
public String getName() {
- clusterGlobalLock.readLock().lock();
- try {
- readWriteLock.readLock().lock();
- try {
- return serviceEntity.getServiceName();
- } finally {
- readWriteLock.readLock().unlock();
- }
- } finally {
- clusterGlobalLock.readLock().unlock();
- }
+ return serviceEntity.getServiceName();
}
@Override
public long getClusterId() {
- clusterGlobalLock.readLock().lock();
- try {
- readWriteLock.readLock().lock();
- try {
- return cluster.getClusterId();
- } finally {
- readWriteLock.readLock().unlock();
- }
- } finally {
- clusterGlobalLock.readLock().unlock();
- }
+ return cluster.getClusterId();
}
@Override
public Map<String, ServiceComponent> getServiceComponents() {
- clusterGlobalLock.readLock().lock();
+ readWriteLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- return new HashMap<String, ServiceComponent>(components);
- } finally {
- readWriteLock.readLock().unlock();
- }
+ return new HashMap<String, ServiceComponent>(components);
} finally {
- clusterGlobalLock.readLock().unlock();
+ readWriteLock.readLock().unlock();
}
}
@@ -234,7 +208,7 @@ public class ServiceImpl implements Service {
+ ", serviceName=" + getName()
+ ", serviceComponentName=" + component.getName());
}
- this.components.put(component.getName(), component);
+ components.put(component.getName(), component);
} finally {
readWriteLock.writeLock().unlock();
}
@@ -265,7 +239,7 @@ public class ServiceImpl implements Service {
+ ", serviceComponentName=" + serviceComponentName);
}
ServiceComponent component = serviceComponentFactory.createNew(this, serviceComponentName);
- this.components.put(component.getName(), component);
+ components.put(component.getName(), component);
return component;
} finally {
readWriteLock.writeLock().unlock();
@@ -273,134 +247,92 @@ public class ServiceImpl implements Service {
} finally {
clusterGlobalLock.writeLock().unlock();
}
-
-
}
@Override
public ServiceComponent getServiceComponent(String componentName)
throws AmbariException {
- clusterGlobalLock.readLock().lock();
+ readWriteLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- if (!components.containsKey(componentName)) {
- throw new ServiceComponentNotFoundException(cluster.getClusterName(),
- getName(),
- componentName);
- }
- return this.components.get(componentName);
- } finally {
- readWriteLock.readLock().unlock();
+ if (!components.containsKey(componentName)) {
+ throw new ServiceComponentNotFoundException(cluster.getClusterName(),
+ getName(), componentName);
}
+ return components.get(componentName);
} finally {
- clusterGlobalLock.readLock().unlock();
+ readWriteLock.readLock().unlock();
}
-
-
}
@Override
public State getDesiredState() {
- clusterGlobalLock.readLock().lock();
+ readWriteLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- return this.serviceDesiredStateEntity.getDesiredState();
- } finally {
- readWriteLock.readLock().unlock();
- }
+ return serviceDesiredStateEntity.getDesiredState();
} finally {
- clusterGlobalLock.readLock().unlock();
+ readWriteLock.readLock().unlock();
}
-
-
}
@Override
public void setDesiredState(State state) {
- clusterGlobalLock.readLock().lock();
+ readWriteLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting DesiredState of Service"
- + ", clusterName=" + cluster.getClusterName()
- + ", clusterId=" + cluster.getClusterId()
- + ", serviceName=" + getName()
- + ", oldDesiredState=" + this.getDesiredState()
- + ", newDesiredState=" + state);
- }
- this.serviceDesiredStateEntity.setDesiredState(state);
- saveIfPersisted();
- } finally {
- readWriteLock.writeLock().unlock();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting DesiredState of Service" + ", clusterName="
+ + cluster.getClusterName() + ", clusterId="
+ + cluster.getClusterId() + ", serviceName=" + getName()
+ + ", oldDesiredState=" + getDesiredState() + ", newDesiredState="
+ + state);
}
+ serviceDesiredStateEntity.setDesiredState(state);
+ saveIfPersisted();
} finally {
- clusterGlobalLock.readLock().unlock();
+ readWriteLock.writeLock().unlock();
}
}
@Override
public StackId getDesiredStackVersion() {
- clusterGlobalLock.readLock().lock();
+ readWriteLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- return gson.fromJson(serviceDesiredStateEntity.getDesiredStackVersion(), StackId.class);
- } finally {
- readWriteLock.readLock().unlock();
- }
+ return gson.fromJson(serviceDesiredStateEntity.getDesiredStackVersion(),
+ StackId.class);
} finally {
- clusterGlobalLock.readLock().unlock();
+ readWriteLock.readLock().unlock();
}
}
@Override
public void setDesiredStackVersion(StackId stackVersion) {
- clusterGlobalLock.readLock().lock();
+ readWriteLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting DesiredStackVersion of Service"
- + ", clusterName=" + cluster.getClusterName()
- + ", clusterId=" + cluster.getClusterId()
- + ", serviceName=" + getName()
- + ", oldDesiredStackVersion=" + getDesiredStackVersion()
- + ", newDesiredStackVersion=" + stackVersion);
- }
- serviceDesiredStateEntity.setDesiredStackVersion(gson.toJson(stackVersion));
- saveIfPersisted();
- } finally {
- readWriteLock.writeLock().unlock();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting DesiredStackVersion of Service" + ", clusterName="
+ + cluster.getClusterName() + ", clusterId="
+ + cluster.getClusterId() + ", serviceName=" + getName()
+ + ", oldDesiredStackVersion=" + getDesiredStackVersion()
+ + ", newDesiredStackVersion=" + stackVersion);
}
+ serviceDesiredStateEntity.setDesiredStackVersion(gson.toJson(stackVersion));
+ saveIfPersisted();
} finally {
- clusterGlobalLock.readLock().unlock();
+ readWriteLock.writeLock().unlock();
}
-
-
}
@Override
public ServiceResponse convertToResponse() {
- clusterGlobalLock.readLock().lock();
+ readWriteLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- ServiceResponse r = new ServiceResponse(cluster.getClusterId(),
- cluster.getClusterName(),
- getName(),
- getDesiredStackVersion().getStackId(),
- getDesiredState().toString());
-
- r.setMaintenanceState(getMaintenanceState().name());
- return r;
- } finally {
- readWriteLock.readLock().unlock();
- }
+ ServiceResponse r = new ServiceResponse(cluster.getClusterId(),
+ cluster.getClusterName(), getName(),
+ getDesiredStackVersion().getStackId(), getDesiredState().toString());
+
+ r.setMaintenanceState(getMaintenanceState().name());
+ return r;
} finally {
- clusterGlobalLock.readLock().unlock();
+ readWriteLock.readLock().unlock();
}
}
@@ -411,32 +343,26 @@ public class ServiceImpl implements Service {
@Override
public void debugDump(StringBuilder sb) {
- clusterGlobalLock.readLock().lock();
+ readWriteLock.readLock().lock();
try {
- readWriteLock.readLock().lock();
- try {
- sb.append("Service={ serviceName=" + getName()
- + ", clusterName=" + cluster.getClusterName()
- + ", clusterId=" + cluster.getClusterId()
- + ", desiredStackVersion=" + getDesiredStackVersion()
- + ", desiredState=" + getDesiredState().toString()
- + ", components=[ ");
- boolean first = true;
- for (ServiceComponent sc : components.values()) {
- if (!first) {
- sb.append(" , ");
- }
- first = false;
- sb.append("\n ");
- sc.debugDump(sb);
- sb.append(" ");
+ sb.append("Service={ serviceName=" + getName() + ", clusterName="
+ + cluster.getClusterName() + ", clusterId=" + cluster.getClusterId()
+ + ", desiredStackVersion=" + getDesiredStackVersion()
+ + ", desiredState=" + getDesiredState().toString()
+ + ", components=[ ");
+ boolean first = true;
+ for (ServiceComponent sc : components.values()) {
+ if (!first) {
+ sb.append(" , ");
}
- sb.append(" ] }");
- } finally {
- readWriteLock.readLock().unlock();
+ first = false;
+ sb.append("\n ");
+ sc.debugDump(sb);
+ sb.append(" ");
}
+ sb.append(" ] }");
} finally {
- clusterGlobalLock.readLock().unlock();
+ readWriteLock.readLock().unlock();
}
}
@@ -453,12 +379,11 @@ public class ServiceImpl implements Service {
} finally {
clusterGlobalLock.readLock().unlock();
}
-
}
@Override
public void persist() {
- clusterGlobalLock.readLock().lock();
+ clusterGlobalLock.writeLock().lock();
try {
readWriteLock.writeLock().lock();
try {
@@ -474,9 +399,8 @@ public class ServiceImpl implements Service {
readWriteLock.writeLock().unlock();
}
} finally {
- clusterGlobalLock.readLock().unlock();
+ clusterGlobalLock.writeLock().unlock();
}
-
}
@Transactional
@@ -504,27 +428,20 @@ public class ServiceImpl implements Service {
@Override
@Transactional
public void refresh() {
- clusterGlobalLock.readLock().lock();
+ readWriteLock.writeLock().lock();
try {
- readWriteLock.writeLock().lock();
- try {
- if (isPersisted()) {
- ClusterServiceEntityPK pk = new ClusterServiceEntityPK();
- pk.setClusterId(getClusterId());
- pk.setServiceName(getName());
- serviceEntity = clusterServiceDAO.findByPK(pk);
- serviceDesiredStateEntity = serviceEntity.getServiceDesiredStateEntity();
- clusterServiceDAO.refresh(serviceEntity);
- serviceDesiredStateDAO.refresh(serviceDesiredStateEntity);
- }
- } finally {
- readWriteLock.writeLock().unlock();
+ if (isPersisted()) {
+ ClusterServiceEntityPK pk = new ClusterServiceEntityPK();
+ pk.setClusterId(getClusterId());
+ pk.setServiceName(getName());
+ serviceEntity = clusterServiceDAO.findByPK(pk);
+ serviceDesiredStateEntity = serviceEntity.getServiceDesiredStateEntity();
+ clusterServiceDAO.refresh(serviceEntity);
+ serviceDesiredStateDAO.refresh(serviceDesiredStateEntity);
}
} finally {
- clusterGlobalLock.readLock().unlock();
+ readWriteLock.writeLock().unlock();
}
-
-
}
@Override
@@ -659,26 +576,20 @@ public class ServiceImpl implements Service {
clusterServiceDAO.removeByPK(pk);
}
-
+
@Override
public void setMaintenanceState(MaintenanceState state) {
- clusterGlobalLock.readLock().lock();
+ readWriteLock.writeLock().lock();
try {
- try {
- readWriteLock.writeLock().lock();
- serviceDesiredStateEntity.setMaintenanceState(state);
- saveIfPersisted();
- } finally {
- readWriteLock.writeLock().unlock();
- }
+ serviceDesiredStateEntity.setMaintenanceState(state);
+ saveIfPersisted();
} finally {
- clusterGlobalLock.readLock().unlock();
+ readWriteLock.writeLock().unlock();
}
}
-
+
@Override
public MaintenanceState getMaintenanceState() {
return serviceDesiredStateEntity.getMaintenanceState();
}
-
}