You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2016/10/19 19:06:08 UTC
[4/8] ambari git commit: Merge branch 'branch-feature-AMBARI-18456'
into trunk
http://git-wip-us.apache.org/repos/asf/ambari/blob/f64fa722/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
index 74eaa62..c1655aa 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
@@ -19,13 +19,13 @@
package org.apache.ambari.server.state.svccomphost;
import java.text.MessageFormat;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -78,11 +78,12 @@ import org.apache.ambari.server.state.fsm.SingleArcTransition;
import org.apache.ambari.server.state.fsm.StateMachine;
import org.apache.ambari.server.state.fsm.StateMachineFactory;
import org.apache.ambari.server.state.stack.upgrade.RepositoryVersionHelper;
+import org.jboss.netty.util.internal.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
-import com.google.inject.Injector;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.google.inject.persist.Transactional;
@@ -92,33 +93,34 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
private static final Logger LOG =
LoggerFactory.getLogger(ServiceComponentHostImpl.class);
- private final ReadWriteLock clusterGlobalLock;
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
private final ServiceComponent serviceComponent;
+
private final Host host;
- private volatile boolean persisted = false;
+
+ private final HostComponentStateDAO hostComponentStateDAO;
+
+ private final HostComponentDesiredStateDAO hostComponentDesiredStateDAO;
+
+ private final HostDAO hostDAO;
@Inject
- HostComponentStateDAO hostComponentStateDAO;
- @Inject
- HostComponentDesiredStateDAO hostComponentDesiredStateDAO;
- @Inject
- HostDAO hostDAO;
- @Inject
- RepositoryVersionDAO repositoryVersionDAO;
- @Inject
- ServiceComponentDesiredStateDAO serviceComponentDesiredStateDAO;
- @Inject
- Clusters clusters;
+ private RepositoryVersionDAO repositoryVersionDAO;
+
+ private final ServiceComponentDesiredStateDAO serviceComponentDesiredStateDAO;
+
+ private final Clusters clusters;
+
@Inject
- ConfigHelper helper;
+ private ConfigHelper helper;
+
@Inject
- AmbariMetaInfo ambariMetaInfo;
+ private AmbariMetaInfo ambariMetaInfo;
+
@Inject
- RepositoryVersionHelper repositoryVersionHelper;
+ private RepositoryVersionHelper repositoryVersionHelper;
/**
* Used for creating commands to send to the agents when alert definitions are
@@ -130,18 +132,12 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
/**
* Used to publish events relating to service CRUD operations.
*/
- @Inject
- private AmbariEventPublisher eventPublisher;
+ private final AmbariEventPublisher eventPublisher;
/**
* Data access object for stack.
*/
- @Inject
- private StackDAO stackDAO;
-
- // Only used when object state is not persisted
- private HostComponentStateEntity stateEntity;
- private HostComponentDesiredStateEntity desiredStateEntity;
+ private final StackDAO stackDAO;
/**
* The desired component state entity PK.
@@ -151,14 +147,19 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
/**
* Cache the generated id for host component state for fast lookups.
*/
- private Long hostComponentStateId;
+ private final Long hostComponentStateId;
private long lastOpStartTime;
private long lastOpEndTime;
private long lastOpLastUpdateTime;
- private Map<String, HostConfig> actualConfigs = new HashMap<String,
- HostConfig>();
- private List<Map<String, String>> processes = new ArrayList<Map<String, String>>();
+
+ private ConcurrentMap<String, HostConfig> actualConfigs = new ConcurrentHashMap<>();
+ private ImmutableList<Map<String, String>> processes = ImmutableList.of();
+
+ /**
+ * The name of the host (which should never, ever change)
+ */
+ private final String hostName;
private static final StateMachineFactory
<ServiceComponentHostImpl, State,
@@ -699,14 +700,9 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
}
private void resetLastOpInfo() {
- try {
- writeLock.lock();
- setLastOpStartTime(-1);
- setLastOpLastUpdateTime(-1);
- setLastOpEndTime(-1);
- } finally {
- writeLock.unlock();
- }
+ setLastOpStartTime(-1);
+ setLastOpLastUpdateTime(-1);
+ setLastOpEndTime(-1);
}
private void updateLastOpInfo(ServiceComponentHostEventType eventType,
@@ -741,8 +737,21 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
@AssistedInject
public ServiceComponentHostImpl(@Assisted ServiceComponent serviceComponent,
- @Assisted String hostName, Injector injector) {
- injector.injectMembers(this);
+ @Assisted String hostName, Clusters clusters, StackDAO stackDAO, HostDAO hostDAO,
+ ServiceComponentDesiredStateDAO serviceComponentDesiredStateDAO,
+ HostComponentStateDAO hostComponentStateDAO,
+ HostComponentDesiredStateDAO hostComponentDesiredStateDAO,
+ AmbariEventPublisher eventPublisher) {
+
+ this.serviceComponent = serviceComponent;
+ this.hostName = hostName;
+ this.clusters = clusters;
+ this.stackDAO = stackDAO;
+ this.hostDAO = hostDAO;
+ this.serviceComponentDesiredStateDAO = serviceComponentDesiredStateDAO;
+ this.hostComponentStateDAO = hostComponentStateDAO;
+ this.hostComponentDesiredStateDAO = hostComponentDesiredStateDAO;
+ this.eventPublisher = eventPublisher;
if (serviceComponent.isClientComponent()) {
stateMachine = clientStateMachineFactory.make(this);
@@ -750,9 +759,6 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
stateMachine = daemonStateMachineFactory.make(this);
}
- this.serviceComponent = serviceComponent;
- clusterGlobalLock = serviceComponent.getClusterGlobalLock();
-
HostEntity hostEntity = null;
try {
host = clusters.getHost(hostName);
@@ -769,7 +775,7 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
StackEntity stackEntity = stackDAO.find(stackId.getStackName(),
stackId.getStackVersion());
- stateEntity = new HostComponentStateEntity();
+ HostComponentStateEntity stateEntity = new HostComponentStateEntity();
stateEntity.setClusterId(serviceComponent.getClusterId());
stateEntity.setComponentName(serviceComponent.getName());
stateEntity.setServiceName(serviceComponent.getServiceName());
@@ -779,7 +785,7 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
stateEntity.setUpgradeState(UpgradeState.NONE);
stateEntity.setCurrentStack(stackEntity);
- desiredStateEntity = new HostComponentDesiredStateEntity();
+ HostComponentDesiredStateEntity desiredStateEntity = new HostComponentDesiredStateEntity();
desiredStateEntity.setClusterId(serviceComponent.getClusterId());
desiredStateEntity.setComponentName(serviceComponent.getName());
desiredStateEntity.setServiceName(serviceComponent.getServiceName());
@@ -795,20 +801,40 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
desiredStateEntityPK = getHostComponentDesiredStateEntityPK(desiredStateEntity);
+ persistEntities(hostEntity, stateEntity, desiredStateEntity);
+
+ // publish the service component installed event
+ ServiceComponentInstalledEvent event = new ServiceComponentInstalledEvent(getClusterId(),
+ stackId.getStackName(), stackId.getStackVersion(), getServiceName(),
+ getServiceComponentName(), getHostName(), isRecoveryEnabled());
+
+ eventPublisher.publish(event);
+
+ hostComponentStateId = stateEntity.getId();
+
resetLastOpInfo();
}
@AssistedInject
public ServiceComponentHostImpl(@Assisted ServiceComponent serviceComponent,
- @Assisted HostComponentStateEntity stateEntity,
- @Assisted HostComponentDesiredStateEntity desiredStateEntity,
- Injector injector) {
- injector.injectMembers(this);
- this.serviceComponent = serviceComponent;
- clusterGlobalLock = serviceComponent.getClusterGlobalLock();
+ @Assisted HostComponentStateEntity stateEntity,
+ @Assisted HostComponentDesiredStateEntity desiredStateEntity, Clusters clusters,
+ StackDAO stackDAO, HostDAO hostDAO,
+ ServiceComponentDesiredStateDAO serviceComponentDesiredStateDAO,
+ HostComponentStateDAO hostComponentStateDAO,
+ HostComponentDesiredStateDAO hostComponentDesiredStateDAO,
+ AmbariEventPublisher eventPublisher) {
- this.desiredStateEntity = desiredStateEntity;
- this.stateEntity = stateEntity;
+ hostName = stateEntity.getHostName();
+
+ this.serviceComponent = serviceComponent;
+ this.clusters = clusters;
+ this.stackDAO = stackDAO;
+ this.hostDAO = hostDAO;
+ this.serviceComponentDesiredStateDAO = serviceComponentDesiredStateDAO;
+ this.hostComponentStateDAO = hostComponentStateDAO;
+ this.hostComponentDesiredStateDAO = hostComponentDesiredStateDAO;
+ this.eventPublisher = eventPublisher;
desiredStateEntityPK = getHostComponentDesiredStateEntityPK(desiredStateEntity);
hostComponentStateId = stateEntity.getId();
@@ -828,8 +854,6 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
LOG.error("Host '{}' was not found " + stateEntity.getHostName());
throw new RuntimeException(e);
}
-
- persisted = true;
}
@Override
@@ -841,119 +865,81 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
@Override
public void setState(State state) {
- writeLock.lock();
- try {
- stateMachine.setCurrentState(state);
- HostComponentStateEntity stateEntity = getStateEntity();
- if (stateEntity != null) {
- getStateEntity().setCurrentState(state);
- saveComponentStateEntityIfPersisted();
- } else {
- LOG.warn("Setting a member on an entity object that may have been " +
- "previously deleted, serviceName = " + getServiceName() + ", " +
- "componentName = " + getServiceComponentName() + ", " +
- "hostName = " + getHostName());
- }
-
- } finally {
- writeLock.unlock();
+ stateMachine.setCurrentState(state);
+ HostComponentStateEntity stateEntity = getStateEntity();
+ if (stateEntity != null) {
+ stateEntity.setCurrentState(state);
+ stateEntity = hostComponentStateDAO.merge(stateEntity);
+ } else {
+ LOG.warn("Setting a member on an entity object that may have been "
+ + "previously deleted, serviceName = " + getServiceName() + ", " + "componentName = "
+ + getServiceComponentName() + ", " + "hostName = " + getHostName());
}
}
@Override
public String getVersion() {
- readLock.lock();
- try {
- HostComponentStateEntity stateEntity = getStateEntity();
- if (stateEntity != null) {
- return stateEntity.getVersion();
- } else {
- LOG.warn("Trying to fetch a member from an entity object that may " +
- "have been previously deleted, serviceName = " + getServiceName() + ", " +
- "componentName = " + getServiceComponentName() + ", " +
- "hostName = " + getHostName());
- }
- } finally {
- readLock.unlock();
+ HostComponentStateEntity stateEntity = getStateEntity();
+ if (stateEntity != null) {
+ return stateEntity.getVersion();
+ } else {
+ LOG.warn("Trying to fetch a member from an entity object that may "
+ + "have been previously deleted, serviceName = " + getServiceName() + ", "
+ + "componentName = " + getServiceComponentName() + ", " + "hostName = " + getHostName());
}
+
return null;
}
@Override
public void setVersion(String version) {
- writeLock.lock();
- try {
- HostComponentStateEntity stateEntity = getStateEntity();
- if (stateEntity != null) {
- getStateEntity().setVersion(version);
- saveComponentStateEntityIfPersisted();
- } else {
- LOG.warn("Setting a member on an entity object that may have been " +
- "previously deleted, serviceName = " + getServiceName() + ", " +
- "componentName = " + getServiceComponentName() + ", " +
- "hostName = " + getHostName());
- }
- } finally {
- writeLock.unlock();
+ HostComponentStateEntity stateEntity = getStateEntity();
+ if (stateEntity != null) {
+ stateEntity.setVersion(version);
+ stateEntity = hostComponentStateDAO.merge(stateEntity);
+ } else {
+ LOG.warn("Setting a member on an entity object that may have been "
+ + "previously deleted, serviceName = " + getServiceName() + ", " + "componentName = "
+ + getServiceComponentName() + ", " + "hostName = " + getHostName());
}
}
@Override
public SecurityState getSecurityState() {
- readLock.lock();
- try {
- HostComponentStateEntity stateEntity = getStateEntity();
- if (stateEntity != null) {
- return getStateEntity().getSecurityState();
- } else {
- LOG.warn("Trying to fetch a member from an entity object that may " +
- "have been previously deleted, serviceName = " + getServiceName() + ", " +
- "componentName = " + getServiceComponentName() + ", " +
- "hostName = " + getHostName());
- }
-
- } finally {
- readLock.unlock();
+ HostComponentStateEntity stateEntity = getStateEntity();
+ if (stateEntity != null) {
+ return stateEntity.getSecurityState();
+ } else {
+ LOG.warn("Trying to fetch a member from an entity object that may "
+ + "have been previously deleted, serviceName = " + getServiceName() + ", "
+ + "componentName = " + getServiceComponentName() + ", " + "hostName = " + getHostName());
}
+
return null;
}
@Override
public void setSecurityState(SecurityState securityState) {
- writeLock.lock();
- try {
- HostComponentStateEntity stateEntity = getStateEntity();
- if (stateEntity != null) {
- getStateEntity().setSecurityState(securityState);
- saveComponentStateEntityIfPersisted();
- } else {
- LOG.warn("Setting a member on an entity object that may have been " +
- "previously deleted, serviceName = " + getServiceName() + ", " +
- "componentName = " + getServiceComponentName() + ", " +
- "hostName = " + getHostName());
- }
-
- } finally {
- writeLock.unlock();
+ HostComponentStateEntity stateEntity = getStateEntity();
+ if (stateEntity != null) {
+ stateEntity.setSecurityState(securityState);
+ stateEntity = hostComponentStateDAO.merge(stateEntity);
+ } else {
+ LOG.warn("Setting a member on an entity object that may have been "
+ + "previously deleted, serviceName = " + getServiceName() + ", " + "componentName = "
+ + getServiceComponentName() + ", " + "hostName = " + getHostName());
}
}
@Override
public SecurityState getDesiredSecurityState() {
- readLock.lock();
- try {
- HostComponentDesiredStateEntity desiredStateEntity = getDesiredStateEntity();
- if (desiredStateEntity != null) {
- return getDesiredStateEntity().getSecurityState();
- } else {
- LOG.warn("Trying to fetch a member from an entity object that may " +
- "have been previously deleted, serviceName = " + getServiceName() + ", " +
- "componentName = " + getServiceComponentName() + ", " +
- "hostName = " + getHostName());
- }
-
- } finally {
- readLock.unlock();
+ HostComponentDesiredStateEntity desiredStateEntity = getDesiredStateEntity();
+ if (desiredStateEntity != null) {
+ return desiredStateEntity.getSecurityState();
+ } else {
+ LOG.warn("Trying to fetch a member from an entity object that may "
+ + "have been previously deleted, serviceName = " + getServiceName() + ", "
+ + "componentName = " + getServiceComponentName() + ", " + "hostName = " + getHostName());
}
return null;
}
@@ -964,16 +950,12 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
throw new AmbariException("The security state must be an endpoint state");
}
- writeLock.lock();
- try {
- LOG.debug("Set DesiredSecurityState on serviceName = {} componentName = {} hostName = {} to {}",
+ LOG.debug("Set DesiredSecurityState on serviceName = {} componentName = {} hostName = {} to {}",
getServiceName(), getServiceComponentName(), getHostName(), securityState);
- getDesiredStateEntity().setSecurityState(securityState);
- saveComponentDesiredStateEntityIfPersisted();
- } finally {
- writeLock.unlock();
- }
+ HostComponentDesiredStateEntity desiredStateEntity = getDesiredStateEntity();
+ desiredStateEntity.setSecurityState(securityState);
+ hostComponentDesiredStateDAO.merge(desiredStateEntity);
}
/**
@@ -987,41 +969,26 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
*/
@Override
public void setUpgradeState(UpgradeState upgradeState) {
- writeLock.lock();
- try {
- HostComponentStateEntity stateEntity = getStateEntity();
- if (stateEntity != null) {
- stateEntity.setUpgradeState(upgradeState);
- saveComponentStateEntityIfPersisted();
- } else {
- LOG.warn("Setting a member on an entity object that may have been " +
- "previously deleted, serviceName = " + getServiceName() + ", " +
- "componentName = " + getServiceComponentName() + ", " +
- "hostName = " + getHostName());
- }
-
- } finally {
- writeLock.unlock();
+ HostComponentStateEntity stateEntity = getStateEntity();
+ if (stateEntity != null) {
+ stateEntity.setUpgradeState(upgradeState);
+ stateEntity = hostComponentStateDAO.merge(stateEntity);
+ } else {
+ LOG.warn("Setting a member on an entity object that may have been "
+ + "previously deleted, serviceName = " + getServiceName() + ", " + "componentName = "
+ + getServiceComponentName() + ", " + "hostName = " + getHostName());
}
}
@Override
public UpgradeState getUpgradeState() {
- readLock.lock();
-
- try {
- HostComponentStateEntity stateEntity = getStateEntity();
- if (stateEntity != null) {
- return stateEntity.getUpgradeState();
- } else {
- LOG.warn("Trying to fetch a state entity from an object that may " +
- "have been previously deleted, serviceName = " + getServiceName() + ", " +
- "componentName = " + getServiceComponentName() + ", " +
- "hostName = " + getHostName());
- }
-
- } finally {
- readLock.unlock();
+ HostComponentStateEntity stateEntity = getStateEntity();
+ if (stateEntity != null) {
+ return stateEntity.getUpgradeState();
+ } else {
+ LOG.warn("Trying to fetch a state entity from an object that may "
+ + "have been previously deleted, serviceName = " + getServiceName() + ", "
+ + "componentName = " + getServiceComponentName() + ", " + "hostName = " + getHostName());
}
return UpgradeState.NONE;
@@ -1037,30 +1004,26 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
+ ", event=" + event.toString());
}
State oldState = getState();
- clusterGlobalLock.readLock().lock();
try {
+ writeLock.lock();
try {
- writeLock.lock();
- try {
- stateMachine.doTransition(event.getType(), event);
- getStateEntity().setCurrentState(stateMachine.getCurrentState());
- saveComponentStateEntityIfPersisted();
- // TODO Audit logs
- } catch (InvalidStateTransitionException e) {
- LOG.error("Can't handle ServiceComponentHostEvent event at"
- + " current state"
- + ", serviceComponentName=" + getServiceComponentName()
- + ", hostName=" + getHostName()
- + ", currentState=" + oldState
- + ", eventType=" + event.getType()
- + ", event=" + event);
- throw e;
- }
- } finally {
- writeLock.unlock();
+ stateMachine.doTransition(event.getType(), event);
+ HostComponentStateEntity stateEntity = getStateEntity();
+ stateEntity.setCurrentState(stateMachine.getCurrentState());
+ stateEntity = hostComponentStateDAO.merge(stateEntity);
+ // TODO Audit logs
+ } catch (InvalidStateTransitionException e) {
+ LOG.error("Can't handle ServiceComponentHostEvent event at"
+ + " current state"
+ + ", serviceComponentName=" + getServiceComponentName()
+ + ", hostName=" + getHostName()
+ + ", currentState=" + oldState
+ + ", eventType=" + event.getType()
+ + ", event=" + event);
+ throw e;
}
} finally {
- clusterGlobalLock.readLock().unlock();
+ writeLock.unlock();
}
if (!oldState.equals(getState())) {
@@ -1105,72 +1068,42 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
* @return the lastOpStartTime
*/
public long getLastOpStartTime() {
- readLock.lock();
- try {
- return lastOpStartTime;
- } finally {
- readLock.unlock();
- }
+ return lastOpStartTime;
}
/**
* @param lastOpStartTime the lastOpStartTime to set
*/
public void setLastOpStartTime(long lastOpStartTime) {
- writeLock.lock();
- try {
- this.lastOpStartTime = lastOpStartTime;
- } finally {
- writeLock.unlock();
- }
+ this.lastOpStartTime = lastOpStartTime;
}
/**
* @return the lastOpEndTime
*/
public long getLastOpEndTime() {
- readLock.lock();
- try {
- return lastOpEndTime;
- } finally {
- readLock.unlock();
- }
+ return lastOpEndTime;
}
/**
* @param lastOpEndTime the lastOpEndTime to set
*/
public void setLastOpEndTime(long lastOpEndTime) {
- writeLock.lock();
- try {
- this.lastOpEndTime = lastOpEndTime;
- } finally {
- writeLock.unlock();
- }
+ this.lastOpEndTime = lastOpEndTime;
}
/**
* @return the lastOpLastUpdateTime
*/
public long getLastOpLastUpdateTime() {
- readLock.lock();
- try {
- return lastOpLastUpdateTime;
- } finally {
- readLock.unlock();
- }
+ return lastOpLastUpdateTime;
}
/**
* @param lastOpLastUpdateTime the lastOpLastUpdateTime to set
*/
public void setLastOpLastUpdateTime(long lastOpLastUpdateTime) {
- writeLock.lock();
- try {
- this.lastOpLastUpdateTime = lastOpLastUpdateTime;
- } finally {
- writeLock.unlock();
- }
+ this.lastOpLastUpdateTime = lastOpLastUpdateTime;
}
@Override
@@ -1190,225 +1123,164 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
@Override
public StackId getStackVersion() {
- readLock.lock();
- try {
- HostComponentStateEntity schStateEntity = stateEntity;
- if (schStateEntity == null) {
- return new StackId();
- }
-
- StackEntity currentStackEntity = schStateEntity.getCurrentStack();
- return new StackId(currentStackEntity.getStackName(),
- currentStackEntity.getStackVersion());
- } finally {
- readLock.unlock();
+ HostComponentStateEntity schStateEntity = getStateEntity();
+ if (schStateEntity == null) {
+ return new StackId();
}
+
+ StackEntity currentStackEntity = schStateEntity.getCurrentStack();
+ return new StackId(currentStackEntity.getStackName(), currentStackEntity.getStackVersion());
}
@Override
public void setStackVersion(StackId stackId) {
StackEntity stackEntity = stackDAO.find(stackId.getStackName(), stackId.getStackVersion());
- writeLock.lock();
- try {
- HostComponentStateEntity stateEntity = getStateEntity();
- if (stateEntity != null) {
- stateEntity.setCurrentStack(stackEntity);
- saveComponentStateEntityIfPersisted();
- } else {
- LOG.warn("Setting a member on an entity object that may have been " +
- "previously deleted, serviceName = " + getServiceName() + ", " +
- "componentName = " + getServiceComponentName() + ", " +
- "hostName = " + getHostName());
- }
-
- } finally {
- writeLock.unlock();
+ HostComponentStateEntity stateEntity = getStateEntity();
+ if (stateEntity != null) {
+ stateEntity.setCurrentStack(stackEntity);
+ stateEntity = hostComponentStateDAO.merge(stateEntity);
+ } else {
+ LOG.warn("Setting a member on an entity object that may have been "
+ + "previously deleted, serviceName = " + getServiceName() + ", " + "componentName = "
+ + getServiceComponentName() + ", " + "hostName = " + getHostName());
}
}
@Override
public State getDesiredState() {
- readLock.lock();
- try {
- HostComponentDesiredStateEntity desiredStateEntity = getDesiredStateEntity();
- if (desiredStateEntity != null) {
- return desiredStateEntity.getDesiredState();
- } else {
- LOG.warn("Trying to fetch a member from an entity object that may " +
- "have been previously deleted, serviceName = " + getServiceName() + ", " +
- "componentName = " + getServiceComponentName() + ", " +
- "hostName = " + getHostName());
- }
-
- } finally {
- readLock.unlock();
+ HostComponentDesiredStateEntity desiredStateEntity = getDesiredStateEntity();
+ if (desiredStateEntity != null) {
+ return desiredStateEntity.getDesiredState();
+ } else {
+ LOG.warn("Trying to fetch a member from an entity object that may "
+ + "have been previously deleted, serviceName = " + getServiceName() + ", "
+ + "componentName = " + getServiceComponentName() + ", " + "hostName = " + getHostName());
}
+
return null;
}
@Override
public void setDesiredState(State state) {
- writeLock.lock();
- try {
- LOG.debug("Set DesiredState on serviceName = {} componentName = {} hostName = {} to {} ",
+ LOG.debug("Set DesiredState on serviceName = {} componentName = {} hostName = {} to {} ",
getServiceName(), getServiceComponentName(), getHostName(), state);
- HostComponentDesiredStateEntity desiredStateEntity = getDesiredStateEntity();
- if (desiredStateEntity != null) {
- desiredStateEntity.setDesiredState(state);
- saveComponentDesiredStateEntityIfPersisted();
- } else {
- LOG.warn("Setting a member on an entity object that may have been " +
- "previously deleted, serviceName = " + getServiceName() + ", " +
- "componentName = " + getServiceComponentName() +
- "hostName = " + getHostName());
- }
- } finally {
- writeLock.unlock();
+ HostComponentDesiredStateEntity desiredStateEntity = getDesiredStateEntity();
+ if (desiredStateEntity != null) {
+ desiredStateEntity.setDesiredState(state);
+ hostComponentDesiredStateDAO.merge(desiredStateEntity);
+ } else {
+ LOG.warn("Setting a member on an entity object that may have been "
+ + "previously deleted, serviceName = " + getServiceName() + ", " + "componentName = "
+ + getServiceComponentName() + "hostName = " + getHostName());
}
}
@Override
public StackId getDesiredStackVersion() {
- readLock.lock();
- try {
- HostComponentDesiredStateEntity desiredStateEntity = getDesiredStateEntity();
- if (desiredStateEntity != null) {
- StackEntity desiredStackEntity = desiredStateEntity.getDesiredStack();
- return new StackId(desiredStackEntity.getStackName(),
- desiredStackEntity.getStackVersion());
- } else {
- LOG.warn("Trying to fetch a member from an entity object that may " +
- "have been previously deleted, serviceName = " + getServiceName() + ", " +
- "componentName = " + getServiceComponentName() + ", " +
- "hostName = " + getHostName());
- }
-
- } finally {
- readLock.unlock();
+ HostComponentDesiredStateEntity desiredStateEntity = getDesiredStateEntity();
+ if (desiredStateEntity != null) {
+ StackEntity desiredStackEntity = desiredStateEntity.getDesiredStack();
+ return new StackId(desiredStackEntity.getStackName(), desiredStackEntity.getStackVersion());
+ } else {
+ LOG.warn("Trying to fetch a member from an entity object that may "
+ + "have been previously deleted, serviceName = " + getServiceName() + ", "
+ + "componentName = " + getServiceComponentName() + ", " + "hostName = " + getHostName());
}
return null;
}
@Override
public void setDesiredStackVersion(StackId stackId) {
- writeLock.lock();
- try {
- LOG.debug("Set DesiredStackVersion on serviceName = {} componentName = {} hostName = {} to {}",
+ LOG.debug("Set DesiredStackVersion on serviceName = {} componentName = {} hostName = {} to {}",
getServiceName(), getServiceComponentName(), getHostName(), stackId);
- HostComponentDesiredStateEntity desiredStateEntity = getDesiredStateEntity();
- if (desiredStateEntity != null) {
- StackEntity stackEntity = stackDAO.find(stackId.getStackName(),
- stackId.getStackVersion());
+ HostComponentDesiredStateEntity desiredStateEntity = getDesiredStateEntity();
+ if (desiredStateEntity != null) {
+ StackEntity stackEntity = stackDAO.find(stackId.getStackName(), stackId.getStackVersion());
- desiredStateEntity.setDesiredStack(stackEntity);
- saveComponentDesiredStateEntityIfPersisted();
- }
- } finally {
- writeLock.unlock();
+ desiredStateEntity.setDesiredStack(stackEntity);
+ hostComponentDesiredStateDAO.merge(desiredStateEntity);
}
}
@Override
public HostComponentAdminState getComponentAdminState() {
- readLock.lock();
- try {
- HostComponentDesiredStateEntity desiredStateEntity = getDesiredStateEntity();
- if (desiredStateEntity != null) {
- HostComponentAdminState adminState = desiredStateEntity.getAdminState();
- if (adminState == null && !serviceComponent.isClientComponent()
+ HostComponentDesiredStateEntity desiredStateEntity = getDesiredStateEntity();
+ if (desiredStateEntity != null) {
+ HostComponentAdminState adminState = desiredStateEntity.getAdminState();
+ if (adminState == null && !serviceComponent.isClientComponent()
&& !serviceComponent.isMasterComponent()) {
- adminState = HostComponentAdminState.INSERVICE;
- }
- return adminState;
+ adminState = HostComponentAdminState.INSERVICE;
}
-
- } finally {
- readLock.unlock();
+ return adminState;
}
+
return null;
}
@Override
public void setComponentAdminState(HostComponentAdminState attribute) {
- writeLock.lock();
- try {
- LOG.debug("Set ComponentAdminState on serviceName = {} componentName = {} hostName = {} to {}",
+ LOG.debug("Set ComponentAdminState on serviceName = {} componentName = {} hostName = {} to {}",
getServiceName(), getServiceComponentName(), getHostName(), attribute);
- HostComponentDesiredStateEntity desiredStateEntity = getDesiredStateEntity();
- if (desiredStateEntity != null) {
- desiredStateEntity.setAdminState(attribute);
- saveComponentDesiredStateEntityIfPersisted();
- } else {
- LOG.warn("Setting a member on an entity object that may have been " +
- "previously deleted, serviceName = " + getServiceName() + ", " +
- "componentName = " + getServiceComponentName() +
- "hostName = " + getHostName());
- }
- } finally {
- writeLock.unlock();
+ HostComponentDesiredStateEntity desiredStateEntity = getDesiredStateEntity();
+ if (desiredStateEntity != null) {
+ desiredStateEntity.setAdminState(attribute);
+ hostComponentDesiredStateDAO.merge(desiredStateEntity);
+ } else {
+ LOG.warn("Setting a member on an entity object that may have been "
+ + "previously deleted, serviceName = " + getServiceName() + ", " + "componentName = "
+ + getServiceComponentName() + "hostName = " + getHostName());
}
}
@Override
public ServiceComponentHostResponse convertToResponse(Map<String, DesiredConfig> desiredConfigs) {
- clusterGlobalLock.readLock().lock();
- try {
- readLock.lock();
- try {
- HostComponentStateEntity hostComponentStateEntity = getStateEntity();
- if (null == hostComponentStateEntity) {
- LOG.warn("Could not convert ServiceComponentHostResponse to a response. It's possible that Host " + getHostName() + " was deleted.");
- return null;
- }
+ HostComponentStateEntity hostComponentStateEntity = getStateEntity();
+ if (null == hostComponentStateEntity) {
+ LOG.warn(
+ "Could not convert ServiceComponentHostResponse to a response. It's possible that Host {} was deleted.",
+ getHostName());
+ return null;
+ }
- String clusterName = serviceComponent.getClusterName();
- String serviceName = serviceComponent.getServiceName();
- String serviceComponentName = serviceComponent.getName();
- String hostName = getHostName();
- String publicHostName = getPublicHostName();
- String state = getState().toString();
- String stackId = getStackVersion().getStackId();
- String desiredState = getDesiredState().toString();
- String desiredStackId = getDesiredStackVersion().getStackId();
- HostComponentAdminState componentAdminState = getComponentAdminState();
- UpgradeState upgradeState = hostComponentStateEntity.getUpgradeState();
-
- String displayName = null;
- try {
- ComponentInfo compInfo = ambariMetaInfo.getComponent(getStackVersion().getStackName(),
- getStackVersion().getStackVersion(), serviceName, serviceComponentName);
- displayName = compInfo.getDisplayName();
- } catch (AmbariException e) {
- displayName = serviceComponentName;
- }
+ String clusterName = serviceComponent.getClusterName();
+ String serviceName = serviceComponent.getServiceName();
+ String serviceComponentName = serviceComponent.getName();
+ String hostName = getHostName();
+ String publicHostName = getPublicHostName();
+ String state = getState().toString();
+ String stackId = getStackVersion().getStackId();
+ String desiredState = getDesiredState().toString();
+ String desiredStackId = getDesiredStackVersion().getStackId();
+ HostComponentAdminState componentAdminState = getComponentAdminState();
+ UpgradeState upgradeState = hostComponentStateEntity.getUpgradeState();
- ServiceComponentHostResponse r = new ServiceComponentHostResponse(
- clusterName, serviceName,
- serviceComponentName, displayName, hostName,
- publicHostName, state,
- stackId, desiredState,
- desiredStackId, componentAdminState);
+ String displayName = null;
+ try {
+ ComponentInfo compInfo = ambariMetaInfo.getComponent(getStackVersion().getStackName(),
+ getStackVersion().getStackVersion(), serviceName, serviceComponentName);
+ displayName = compInfo.getDisplayName();
+ } catch (AmbariException e) {
+ displayName = serviceComponentName;
+ }
- r.setActualConfigs(actualConfigs);
- r.setUpgradeState(upgradeState);
+ ServiceComponentHostResponse r = new ServiceComponentHostResponse(clusterName, serviceName,
+ serviceComponentName, displayName, hostName, publicHostName, state, stackId,
+ desiredState, desiredStackId, componentAdminState);
- try {
- r.setStaleConfig(helper.isStaleConfigs(this, desiredConfigs));
- } catch (Exception e) {
- LOG.error("Could not determine stale config", e);
- }
+ r.setActualConfigs(actualConfigs);
+ r.setUpgradeState(upgradeState);
- return r;
- } finally {
- readLock.unlock();
- }
- } finally {
- clusterGlobalLock.readLock().unlock();
+ try {
+ r.setStaleConfig(helper.isStaleConfigs(this, desiredConfigs));
+ } catch (Exception e) {
+ LOG.error("Could not determine stale config", e);
}
+
+ return r;
}
@Override
@@ -1418,102 +1290,35 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
@Override
public void debugDump(StringBuilder sb) {
- readLock.lock();
- try {
- sb.append("ServiceComponentHost={ hostname=").append(getHostName()).append(
- ", serviceComponentName=").append(serviceComponent.getName()).append(
- ", clusterName=").append(serviceComponent.getClusterName()).append(
- ", serviceName=").append(serviceComponent.getServiceName()).append(
- ", desiredStackVersion=").append(getDesiredStackVersion()).append(
- ", desiredState=").append(getDesiredState()).append(", stackVersion=").append(
- getStackVersion()).append(", state=").append(getState()).append(
- ", securityState=").append(getSecurityState()).append(
- ", desiredSecurityState=").append(getDesiredSecurityState()).append(
- " }");
- } finally {
- readLock.unlock();
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean isPersisted() {
- // a lock around this internal state variable is not required since we
- // have appropriate locks in the persist() method and this member is
- // only ever false under the condition that the object is new
- return persisted;
- }
-
- /**
- * {@inheritDoc}
- * <p/>
- * This method uses Java locks and then delegates to internal methods which
- * perform the JPA merges inside of a transaction. Because of this, a
- * transaction is not necessary before this calling this method.
- */
- @Override
- public void persist() {
- boolean clusterWriteLockAcquired = false;
- if (!persisted) {
- clusterGlobalLock.writeLock().lock();
- clusterWriteLockAcquired = true;
- }
-
- try {
- writeLock.lock();
- try {
- if (!persisted) {
- // persist the new cluster topology and then release the cluster lock
- // as it has no more bearing on the rest of this persist() method
- persistEntities();
- persisted = true;
-
- clusterGlobalLock.writeLock().unlock();
- clusterWriteLockAcquired = false;
-
- // these should still be done with the internal lock
- refresh();
- // There refresh calls are no longer needed with cached references
- // not used on getters/setters
- // NOTE: Refreshing parents is a bad pattern.
- //host.refresh();
- //serviceComponent.refresh();
-
- // publish the service component installed event
- StackId stackId = getDesiredStackVersion();
-
- ServiceComponentInstalledEvent event = new ServiceComponentInstalledEvent(
- getClusterId(), stackId.getStackName(),
- stackId.getStackVersion(), getServiceName(), getServiceComponentName(), getHostName(),
- isRecoveryEnabled());
-
- eventPublisher.publish(event);
- } else {
- saveComponentStateEntityIfPersisted();
- saveComponentDesiredStateEntityIfPersisted();
- }
- } finally {
- writeLock.unlock();
- }
- } finally {
- if (clusterWriteLockAcquired) {
- clusterGlobalLock.writeLock().unlock();
- }
- }
+ sb.append("ServiceComponentHost={ hostname=").append(getHostName())
+ .append(", serviceComponentName=")
+ .append(serviceComponent.getName())
+ .append(", clusterName=")
+ .append(serviceComponent.getClusterName())
+ .append(", serviceName=")
+ .append(serviceComponent.getServiceName())
+ .append(", desiredStackVersion=")
+ .append(getDesiredStackVersion())
+ .append(", desiredState=")
+ .append(getDesiredState())
+ .append(", stackVersion=")
+ .append(getStackVersion())
+ .append(", state=")
+ .append(getState())
+ .append(", securityState=")
+ .append(getSecurityState())
+ .append(", desiredSecurityState=")
+ .append(getDesiredSecurityState())
+ .append(" }");
}
@Transactional
- protected void persistEntities() {
+ private void persistEntities(HostEntity hostEntity, HostComponentStateEntity stateEntity,
+ HostComponentDesiredStateEntity desiredStateEntity) {
ServiceComponentDesiredStateEntity serviceComponentDesiredStateEntity = serviceComponentDesiredStateDAO.findByName(
serviceComponent.getClusterId(), serviceComponent.getServiceName(),
serviceComponent.getName());
- HostEntity hostEntity = hostDAO.findByName(getHostName());
- hostEntity.addHostComponentStateEntity(stateEntity);
- hostEntity.addHostComponentDesiredStateEntity(desiredStateEntity);
-
desiredStateEntity.setServiceComponentDesiredStateEntity(serviceComponentDesiredStateEntity);
desiredStateEntity.setHostEntity(hostEntity);
@@ -1526,100 +1331,32 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
serviceComponentDesiredStateEntity.getHostComponentDesiredStateEntities().add(
desiredStateEntity);
- HostComponentStateEntity stateEntity = hostComponentStateDAO.findByIndex(serviceComponent.getClusterId(),
- serviceComponent.getServiceName(), serviceComponent.getName(), hostEntity.getHostId());
-
- hostComponentStateId = stateEntity.getId();
-
- serviceComponentDesiredStateDAO.merge(serviceComponentDesiredStateEntity);
- hostDAO.merge(hostEntity);
- }
-
- @Override
- public void refresh() {
- writeLock.lock();
- try {
- getDesiredStateEntity();
- getStateEntity();
- } finally {
- writeLock.unlock();
- }
- }
-
- /**
- * Merges the encapsulated {@link HostComponentStateEntity} inside of a new transaction. This
- * method assumes that the appropriate write lock has already been acquired
- * from {@link #readWriteLock}.
- */
- @Transactional
- void saveComponentStateEntityIfPersisted() {
- if (isPersisted()) {
- hostComponentStateDAO.merge(stateEntity);
- }
- }
-
- /**
- * Merges the encapsulated {@link HostComponentDesiredStateEntity} inside of a new transaction. This
- * method assumes that the appropriate write lock has already been acquired
- * from {@link #readWriteLock}.
- */
- @Transactional
- void saveComponentDesiredStateEntityIfPersisted() {
- if (isPersisted()) {
- LOG.debug("Save desiredStateEntity serviceName = {} componentName = {} hostName = {} desiredState = {}",
- getServiceName(), getServiceComponentName(), getHostName(), desiredStateEntity.getDesiredState());
+ serviceComponentDesiredStateEntity = serviceComponentDesiredStateDAO.merge(
+ serviceComponentDesiredStateEntity);
- hostComponentDesiredStateDAO.merge(desiredStateEntity);
- }
+ hostEntity.addHostComponentStateEntity(stateEntity);
+ hostEntity.addHostComponentDesiredStateEntity(desiredStateEntity);
+ hostEntity = hostDAO.merge(hostEntity);
}
-
@Override
public boolean canBeRemoved() {
- clusterGlobalLock.readLock().lock();
- boolean schLockAcquired = false;
- try {
- // if unable to read, then writers are writing; cannot remove SCH
- schLockAcquired = readLock.tryLock();
-
- return schLockAcquired && (getState().isRemovableState());
-
- } finally {
- if (schLockAcquired) {
- readLock.unlock();
- }
- clusterGlobalLock.readLock().unlock();
- }
+ return getState().isRemovableState();
}
@Override
public void delete() {
boolean fireRemovalEvent = false;
- clusterGlobalLock.writeLock().lock();
+ writeLock.lock();
try {
- writeLock.lock();
- try {
- if (persisted) {
- removeEntities();
-
- // host must be re-loaded from db to refresh the cached JPA HostEntity
- // that references HostComponentDesiredStateEntity
- // and HostComponentStateEntity JPA entities
- host.refresh();
-
- persisted = false;
- fireRemovalEvent = true;
- }
-
- clusters.getCluster(getClusterName()).removeServiceComponentHost(this);
- } catch (AmbariException ex) {
- LOG.error("Unable to remove a service component from a host", ex);
- } finally {
- writeLock.unlock();
- }
+ removeEntities();
+ fireRemovalEvent = true;
+ clusters.getCluster(getClusterName()).removeServiceComponentHost(this);
+ } catch (AmbariException ex) {
+ LOG.error("Unable to remove a service component from a host", ex);
} finally {
- clusterGlobalLock.writeLock().unlock();
+ writeLock.unlock();
}
// publish event for the removal of the SCH after the removal is
@@ -1646,14 +1383,16 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
protected void removeEntities() {
HostComponentStateEntity stateEntity = getStateEntity();
if (stateEntity != null) {
- // make sure that the state entities are removed from the associated (detached) host entity
- // Also refresh before delete
- stateEntity.getHostEntity().removeHostComponentStateEntity(stateEntity);
+ HostEntity hostEntity = stateEntity.getHostEntity();
HostComponentDesiredStateEntity desiredStateEntity = getDesiredStateEntity();
- desiredStateEntity.getHostEntity().removeHostComponentDesiredStateEntity(desiredStateEntity);
- hostComponentDesiredStateDAO.remove(desiredStateEntity);
+ // Make sure that the state entity is removed from its host entity
+ hostEntity.removeHostComponentStateEntity(stateEntity);
+ hostEntity.removeHostComponentDesiredStateEntity(desiredStateEntity);
+
+ hostDAO.merge(hostEntity);
+ hostComponentDesiredStateDAO.remove(desiredStateEntity);
hostComponentStateDAO.remove(stateEntity);
}
}
@@ -1670,56 +1409,43 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
return;
}
- writeLock.lock();
- try {
- LOG.debug("Updating actual config tags: " + configTags);
- actualConfigs = new HashMap<String, HostConfig>();
-
- for (Entry<String, Map<String, String>> entry : configTags.entrySet()) {
- String type = entry.getKey();
- Map<String, String> values = new HashMap<String, String>(
- entry.getValue());
-
- String tag = values.get(ConfigHelper.CLUSTER_DEFAULT_TAG);
- values.remove(ConfigHelper.CLUSTER_DEFAULT_TAG);
-
- HostConfig hc = new HostConfig();
- hc.setDefaultVersionTag(tag);
- actualConfigs.put(type, hc);
-
- if (!values.isEmpty()) {
- for (Entry<String, String> overrideEntry : values.entrySet()) {
- Long groupId = Long.parseLong(overrideEntry.getKey());
- hc.getConfigGroupOverrides().put(groupId, overrideEntry.getValue());
- if (!configGroupMap.containsKey(groupId)) {
- LOG.debug("Config group does not exist, id = " + groupId);
- }
+ LOG.debug("Updating configuration tags for {}: {}", hostName, configTags);
+ final ConcurrentMap<String, HostConfig> newActualConfigs = new ConcurrentHashMap<>();
+
+ for (Entry<String, Map<String, String>> entry : configTags.entrySet()) {
+ String type = entry.getKey();
+ Map<String, String> values = new HashMap<String, String>(entry.getValue());
+
+ String tag = values.get(ConfigHelper.CLUSTER_DEFAULT_TAG);
+ values.remove(ConfigHelper.CLUSTER_DEFAULT_TAG);
+
+ HostConfig hc = new HostConfig();
+ hc.setDefaultVersionTag(tag);
+ newActualConfigs.put(type, hc);
+
+ if (!values.isEmpty()) {
+ for (Entry<String, String> overrideEntry : values.entrySet()) {
+ Long groupId = Long.parseLong(overrideEntry.getKey());
+ hc.getConfigGroupOverrides().put(groupId, overrideEntry.getValue());
+ if (!configGroupMap.containsKey(groupId)) {
+ LOG.debug("Config group does not exist, id = " + groupId);
}
}
}
- } finally {
- writeLock.unlock();
}
+
+ // update internal stateful collection in an "atomic" manner
+ actualConfigs = newActualConfigs;
}
@Override
public Map<String, HostConfig> getActualConfigs() {
- readLock.lock();
- try {
- return actualConfigs;
- } finally {
- readLock.unlock();
- }
+ return actualConfigs;
}
@Override
public HostState getHostState() {
- readLock.lock();
- try {
- return host.getState();
- } finally {
- readLock.unlock();
- }
+ return host.getState();
}
@Override
@@ -1729,90 +1455,57 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
@Override
public void setMaintenanceState(MaintenanceState state) {
- writeLock.lock();
- try {
- LOG.debug("Set MaintenanceState on serviceName = {} componentName = {} hostName = {} to {}",
+ LOG.debug("Set MaintenanceState on serviceName = {} componentName = {} hostName = {} to {}",
getServiceName(), getServiceComponentName(), getHostName(), state);
- HostComponentDesiredStateEntity desiredStateEntity = getDesiredStateEntity();
- if (desiredStateEntity != null) {
- desiredStateEntity.setMaintenanceState(state);
- saveComponentDesiredStateEntityIfPersisted();
-
- // broadcast the maintenance mode change
- MaintenanceModeEvent event = new MaintenanceModeEvent(state, this);
- eventPublisher.publish(event);
- } else {
- LOG.warn("Setting a member on an entity object that may have been " +
- "previously deleted, serviceName = " + getServiceName() + ", " +
- "componentName = " + getServiceComponentName() +
- ", hostName = " + getHostName());
- }
- } finally {
- writeLock.unlock();
+ HostComponentDesiredStateEntity desiredStateEntity = getDesiredStateEntity();
+ if (desiredStateEntity != null) {
+ desiredStateEntity.setMaintenanceState(state);
+ hostComponentDesiredStateDAO.merge(desiredStateEntity);
+
+ // broadcast the maintenance mode change
+ MaintenanceModeEvent event = new MaintenanceModeEvent(state, this);
+ eventPublisher.publish(event);
+ } else {
+ LOG.warn("Setting a member on an entity object that may have been "
+ + "previously deleted, serviceName = " + getServiceName() + ", " + "componentName = "
+ + getServiceComponentName() + ", hostName = " + getHostName());
}
}
@Override
public MaintenanceState getMaintenanceState() {
- readLock.lock();
- try {
- return getDesiredStateEntity().getMaintenanceState();
- } finally {
- readLock.unlock();
- }
+ return getDesiredStateEntity().getMaintenanceState();
}
@Override
public void setProcesses(List<Map<String, String>> procs) {
- writeLock.lock();
- try {
- processes = Collections.unmodifiableList(procs);
- } finally {
- writeLock.unlock();
- }
+ processes = ImmutableList.copyOf(procs);
}
@Override
public List<Map<String, String>> getProcesses() {
- readLock.lock();
- try {
- return processes;
- } finally {
- readLock.unlock();
- }
+ return processes;
}
@Override
public boolean isRestartRequired() {
- readLock.lock();
- try {
- return getDesiredStateEntity().isRestartRequired();
- } finally {
- readLock.unlock();
- }
+ return getDesiredStateEntity().isRestartRequired();
}
@Override
public void setRestartRequired(boolean restartRequired) {
- writeLock.lock();
- try {
- LOG.debug("Set RestartRequired on serviceName = {} componentName = {} hostName = {} to {}",
+ LOG.debug("Set RestartRequired on serviceName = {} componentName = {} hostName = {} to {}",
getServiceName(), getServiceComponentName(), getHostName(), restartRequired);
- HostComponentDesiredStateEntity desiredStateEntity = getDesiredStateEntity();
- if (desiredStateEntity != null) {
- desiredStateEntity.setRestartRequired(restartRequired);
- saveComponentDesiredStateEntityIfPersisted();
- } else {
- LOG.warn("Setting a member on an entity object that may have been " +
- "previously deleted, serviceName = " + getServiceName() + ", " +
- "componentName = " + getServiceComponentName() +
- ", hostName = " + getHostName());
- }
-
- } finally {
- writeLock.unlock();
+ HostComponentDesiredStateEntity desiredStateEntity = getDesiredStateEntity();
+ if (desiredStateEntity != null) {
+ desiredStateEntity.setRestartRequired(restartRequired);
+ hostComponentDesiredStateDAO.merge(desiredStateEntity);
+ } else {
+ LOG.warn("Setting a member on an entity object that may have been "
+ + "previously deleted, serviceName = " + getServiceName() + ", " + "componentName = "
+ + getServiceComponentName() + ", hostName = " + getHostName());
}
}
@@ -1867,30 +1560,27 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
final StackId stackId = cluster.getDesiredStackVersion();
final StackInfo stackInfo = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion());
- writeLock.lock();
- try {
- // Check if there is a Repo Version already for the version.
- // If it doesn't exist, will have to create it.
- repositoryVersion = repositoryVersionDAO.findByStackNameAndVersion(stackId.getStackName(), version);
-
- if (null == repositoryVersion) {
- repositoryVersion = createRepositoryVersion(version, stackId, stackInfo);
- }
+ // Check if there is a Repo Version already for the version.
+ // If it doesn't exist, will have to create it.
+ repositoryVersion = repositoryVersionDAO.findByStackNameAndVersion(stackId.getStackName(), version);
- final HostEntity host = hostDAO.findById(hostId);
- cluster.transitionHostVersionState(host, repositoryVersion, stackId);
- } finally {
- writeLock.unlock();
+ if (null == repositoryVersion) {
+ repositoryVersion = createRepositoryVersion(version, stackId, stackInfo);
}
+
+ final HostEntity host = hostDAO.findById(hostId);
+ cluster.transitionHostVersionState(host, repositoryVersion, stackId);
+
return repositoryVersion;
}
- // Get the cached desired state entity or load it fresh through the DAO.
+ /**
+ * Gets the desired state entity for this {@link ServiceComponentHost}.
+ *
+ * @return
+ */
private HostComponentDesiredStateEntity getDesiredStateEntity() {
- if (isPersisted()) {
- desiredStateEntity = hostComponentDesiredStateDAO.findByPK(desiredStateEntityPK);
- }
- return desiredStateEntity;
+ return hostComponentDesiredStateDAO.findByPK(desiredStateEntityPK);
}
/**
@@ -1900,11 +1590,7 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
* {@link ServiceComponentHost}, or {@code null} if there is none.
*/
private HostComponentStateEntity getStateEntity() {
- if (isPersisted()) {
- stateEntity = hostComponentStateDAO.findById(hostComponentStateId);
- }
-
- return stateEntity;
+ return hostComponentStateDAO.findById(hostComponentStateId);
}
// create a PK object from the given desired component state entity.
http://git-wip-us.apache.org/repos/asf/ambari/blob/f64fa722/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
index b3e3941..bba0325 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
@@ -742,7 +742,6 @@ public class TopologyManager {
if (null != rackInfoFromTemplate) {
host.setRackInfo(rackInfoFromTemplate);
- host.persist(); //todo this is required only if host is not persisted to database yet, is it really so?
try {
// todo: do we need this in case of blueprints?
ambariContext.getController().registerRackChange(ambariContext.getClusterName(topology.getClusterId()));
http://git-wip-us.apache.org/repos/asf/ambari/blob/f64fa722/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java
index 877e84d..d732edf 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/RetryHelper.java
@@ -17,23 +17,25 @@
*/
package org.apache.ambari.server.utils;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
import org.eclipse.persistence.exceptions.DatabaseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.Callable;
-
/**
* Provides utility methods to support operations retry
* TODO injection as Guice singleon, static for now to avoid major modifications
*/
public class RetryHelper {
protected final static Logger LOG = LoggerFactory.getLogger(RetryHelper.class);
+ private static Clusters s_clusters;
private static ThreadLocal<Set<Cluster>> affectedClusters = new ThreadLocal<Set<Cluster>>(){
@Override
@@ -44,7 +46,8 @@ public class RetryHelper {
private static int operationsRetryAttempts = 0;
- public static void init(int operationsRetryAttempts) {
+ public static void init(Clusters clusters, int operationsRetryAttempts) {
+ s_clusters = clusters;
RetryHelper.operationsRetryAttempts = operationsRetryAttempts;
}
@@ -82,7 +85,8 @@ public class RetryHelper {
public static void invalidateAffectedClusters() {
for (Cluster cluster : affectedClusters.get()) {
- cluster.invalidateData();
+ s_clusters.invalidate(cluster);
+ affectedClusters.get().remove(cluster);
}
}
@@ -90,7 +94,6 @@ public class RetryHelper {
RetryHelper.clearAffectedClusters();
int retryAttempts = RetryHelper.getOperationsRetryAttempts();
do {
-
try {
return command.call();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/f64fa722/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java
index b212e93..ffca51d 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java
@@ -103,7 +103,6 @@ public class ExecutionCommandWrapperTest {
clusters = injector.getInstance(Clusters.class);
clusters.addHost(HOST1);
- clusters.getHost(HOST1).persist();
clusters.addCluster(CLUSTER1, new StackId("HDP-0.1"));
Cluster cluster1 = clusters.getCluster(CLUSTER1);
http://git-wip-us.apache.org/repos/asf/ambari/blob/f64fa722/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
index bf9d0db..1ca777d 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
@@ -112,10 +112,7 @@ public class TestActionDBAccessorImpl {
// Add this host's name since it is needed for server-side actions.
clusters.addHost(serverHostName);
- clusters.getHost(serverHostName).persist();
-
clusters.addHost(hostName);
- clusters.getHost(hostName).persist();
StackId stackId = new StackId("HDP-0.1");
clusters.addCluster(clusterName, stackId);
@@ -273,7 +270,6 @@ public class TestActionDBAccessorImpl {
for (int i = 0; i < 1000; i++) {
String hostName = "c64-" + i;
clusters.addHost(hostName);
- clusters.getHost(hostName).persist();
}
// create 1 request, 3 stages per host, each with 2 commands
@@ -460,7 +456,6 @@ public class TestActionDBAccessorImpl {
requestIds.add(requestId);
populateActionDB(db, hostName, requestId, stageId);
clusters.addHost("host2");
- clusters.getHost("host2").persist();
populateActionDB(db, hostName, requestId + 1, stageId);
List<Long> requestIdsResult =
db.getRequestsByStatus(null, BaseRequest.DEFAULT_PAGE_SIZE, false);
@@ -546,11 +541,8 @@ public class TestActionDBAccessorImpl {
s.setStageId(stageId);
clusters.addHost("host2");
- clusters.getHost("host2").persist();
clusters.addHost("host3");
- clusters.getHost("host3").persist();
clusters.addHost("host4");
- clusters.getHost("host4").persist();
s.addHostRoleExecutionCommand("host1", Role.HBASE_MASTER,
RoleCommand.START,
@@ -680,7 +672,6 @@ public class TestActionDBAccessorImpl {
String host = "host" + i;
clusters.addHost(host);
- clusters.getHost(host).persist();
s.addHostRoleExecutionCommand("host" + i, Role.HBASE_MASTER,
RoleCommand.START, null, "cluster1", "HBASE", false, false);
http://git-wip-us.apache.org/repos/asf/ambari/blob/f64fa722/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
index fa51f91..da54789 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
@@ -24,21 +24,19 @@ import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import junit.framework.Assert;
-
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.Role;
import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.agent.ActionQueue;
import org.apache.ambari.server.agent.CommandReport;
import org.apache.ambari.server.audit.AuditLogger;
-import org.apache.ambari.server.controller.HostsMap;
import org.apache.ambari.server.events.publishers.JPAEventPublisher;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
@@ -57,7 +55,8 @@ import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.persist.PersistService;
import com.google.inject.persist.UnitOfWork;
-import static org.junit.Assert.assertNotNull;
+
+import junit.framework.Assert;
public class TestActionManager {
@@ -79,7 +78,6 @@ public class TestActionManager {
stageFactory = injector.getInstance(StageFactory.class);
clusters.addHost(hostname);
- clusters.getHost(hostname).persist();
StackId stackId = new StackId("HDP-0.1");
clusters.addCluster(clusterName, stackId);
unitOfWork = injector.getInstance(UnitOfWork.class);
http://git-wip-us.apache.org/repos/asf/ambari/blob/f64fa722/ambari-server/src/test/java/org/apache/ambari/server/agent/AgentResourceTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/AgentResourceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/AgentResourceTest.java
index dd741e9..674025c 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/agent/AgentResourceTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/AgentResourceTest.java
@@ -18,10 +18,12 @@
package org.apache.ambari.server.agent;
+import static org.easymock.EasyMock.createNiceMock;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import javax.persistence.EntityManager;
import javax.ws.rs.core.MediaType;
import org.apache.ambari.server.RandomPortJerseyTest;
@@ -35,6 +37,7 @@ import org.apache.ambari.server.agent.rest.AgentResource;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.orm.DBAccessor;
+import org.apache.ambari.server.orm.dao.HostDAO;
import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
import org.apache.ambari.server.security.SecurityHelper;
import org.apache.ambari.server.security.SecurityHelperImpl;
@@ -55,7 +58,6 @@ import org.apache.ambari.server.state.ServiceFactory;
import org.apache.ambari.server.state.ServiceImpl;
import org.apache.ambari.server.state.cluster.ClusterFactory;
import org.apache.ambari.server.state.cluster.ClusterImpl;
-import org.apache.ambari.server.state.cluster.ClustersImpl;
import org.apache.ambari.server.state.configgroup.ConfigGroup;
import org.apache.ambari.server.state.configgroup.ConfigGroupFactory;
import org.apache.ambari.server.state.configgroup.ConfigGroupImpl;
@@ -66,6 +68,7 @@ import org.apache.ambari.server.state.scheduler.RequestExecutionFactory;
import org.apache.ambari.server.state.scheduler.RequestExecutionImpl;
import org.apache.ambari.server.state.stack.OsFamily;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostImpl;
+import org.apache.ambari.server.topology.PersistedState;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.jettison.json.JSONException;
@@ -80,7 +83,6 @@ import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.assistedinject.FactoryModuleBuilder;
-import com.google.inject.persist.jpa.AmbariJpaPersistModule;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource;
@@ -296,7 +298,6 @@ public class AgentResourceTest extends RandomPortJerseyTest {
// The test will fail anyway
}
requestStaticInjection(AgentResource.class);
- bind(Clusters.class).to(ClustersImpl.class);
os_family = mock(OsFamily.class);
actionManager = mock(ActionManager.class);
ambariMetaInfo = mock(AmbariMetaInfo.class);
@@ -311,10 +312,13 @@ public class AgentResourceTest extends RandomPortJerseyTest {
bind(AmbariMetaInfo.class).toInstance(ambariMetaInfo);
bind(DBAccessor.class).toInstance(mock(DBAccessor.class));
bind(HostRoleCommandDAO.class).toInstance(mock(HostRoleCommandDAO.class));
+ bind(EntityManager.class).toInstance(createNiceMock(EntityManager.class));
+ bind(HostDAO.class).toInstance(createNiceMock(HostDAO.class));
+ bind(Clusters.class).toInstance(createNiceMock(Clusters.class));
+ bind(PersistedState.class).toInstance(createNiceMock(PersistedState.class));
}
private void installDependencies() {
- install(new AmbariJpaPersistModule("ambari-javadb"));
install(new FactoryModuleBuilder().implement(
Cluster.class, ClusterImpl.class).build(ClusterFactory.class));
install(new FactoryModuleBuilder().implement(
http://git-wip-us.apache.org/repos/asf/ambari/blob/f64fa722/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java
index 299002b..6ebd11a 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java
@@ -59,7 +59,6 @@ import org.apache.ambari.server.actionmanager.StageFactory;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.audit.AuditLogger;
import org.apache.ambari.server.configuration.Configuration;
-import org.apache.ambari.server.controller.AmbariManagementController;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
import org.apache.ambari.server.orm.dao.HostDAO;
@@ -160,13 +159,12 @@ public class HeartbeatProcessorTest {
public void testHeartbeatWithConfigs() throws Exception {
Cluster cluster = heartbeatTestHelper.getDummyCluster();
Service hdfs = cluster.addService(HDFS);
- hdfs.persist();
- hdfs.addServiceComponent(DATANODE).persist();
- hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(NAMENODE).persist();
- hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
- hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(DATANODE);
+ hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1);
+ hdfs.addServiceComponent(NAMENODE);
+ hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1);
+ hdfs.addServiceComponent(SECONDARY_NAMENODE);
+ hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1);
ActionQueue aq = new ActionQueue();
@@ -230,9 +228,8 @@ public class HeartbeatProcessorTest {
public void testRestartRequiredAfterInstallClient() throws Exception {
Cluster cluster = heartbeatTestHelper.getDummyCluster();
Service hdfs = cluster.addService(HDFS);
- hdfs.persist();
- hdfs.addServiceComponent(HDFS_CLIENT).persist();
- hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(HDFS_CLIENT);
+ hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1);
ActionQueue aq = new ActionQueue();
@@ -296,13 +293,12 @@ public class HeartbeatProcessorTest {
public void testHeartbeatCustomCommandWithConfigs() throws Exception {
Cluster cluster = heartbeatTestHelper.getDummyCluster();
Service hdfs = cluster.addService(HDFS);
- hdfs.persist();
- hdfs.addServiceComponent(DATANODE).persist();
- hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(NAMENODE).persist();
- hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
- hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(DATANODE);
+ hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1);
+ hdfs.addServiceComponent(NAMENODE);
+ hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1);
+ hdfs.addServiceComponent(SECONDARY_NAMENODE);
+ hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1);
ActionQueue aq = new ActionQueue();
@@ -381,13 +377,12 @@ public class HeartbeatProcessorTest {
public void testHeartbeatCustomStartStop() throws Exception {
Cluster cluster = heartbeatTestHelper.getDummyCluster();
Service hdfs = cluster.addService(HDFS);
- hdfs.persist();
- hdfs.addServiceComponent(DATANODE).persist();
- hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(NAMENODE).persist();
- hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
- hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(DATANODE);
+ hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1);
+ hdfs.addServiceComponent(NAMENODE);
+ hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1);
+ hdfs.addServiceComponent(SECONDARY_NAMENODE);
+ hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1);
ActionQueue aq = new ActionQueue();
@@ -466,13 +461,12 @@ public class HeartbeatProcessorTest {
public void testStatusHeartbeat() throws Exception {
Cluster cluster = heartbeatTestHelper.getDummyCluster();
Service hdfs = cluster.addService(HDFS);
- hdfs.persist();
- hdfs.addServiceComponent(DATANODE).persist();
- hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(NAMENODE).persist();
- hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
- hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(DATANODE);
+ hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1);
+ hdfs.addServiceComponent(NAMENODE);
+ hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1);
+ hdfs.addServiceComponent(SECONDARY_NAMENODE);
+ hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1);
ActionQueue aq = new ActionQueue();
@@ -544,7 +538,6 @@ public class HeartbeatProcessorTest {
public void testCommandReport() throws AmbariException {
injector.injectMembers(this);
clusters.addHost(DummyHostname1);
- clusters.getHost(DummyHostname1).persist();
StackId dummyStackId = new StackId(DummyStackId);
clusters.addCluster(DummyCluster, dummyStackId);
@@ -594,12 +587,11 @@ public class HeartbeatProcessorTest {
@Test
@SuppressWarnings("unchecked")
public void testCommandReportOnHeartbeatUpdatedState()
- throws AmbariException, InvalidStateTransitionException {
+ throws Exception {
Cluster cluster = heartbeatTestHelper.getDummyCluster();
Service hdfs = cluster.addService(HDFS);
- hdfs.persist();
- hdfs.addServiceComponent(DATANODE).persist();
- hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(DATANODE);
+ hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1);
ActionQueue aq = new ActionQueue();
@@ -714,12 +706,11 @@ public class HeartbeatProcessorTest {
@Test
@SuppressWarnings("unchecked")
- public void testUpgradeSpecificHandling() throws AmbariException, InvalidStateTransitionException {
+ public void testUpgradeSpecificHandling() throws Exception {
Cluster cluster = heartbeatTestHelper.getDummyCluster();
Service hdfs = cluster.addService(HDFS);
- hdfs.persist();
- hdfs.addServiceComponent(DATANODE).persist();
- hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(DATANODE);
+ hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1);
ActionQueue aq = new ActionQueue();
@@ -812,9 +803,8 @@ public class HeartbeatProcessorTest {
public void testCommandStatusProcesses() throws Exception {
Cluster cluster = heartbeatTestHelper.getDummyCluster();
Service hdfs = cluster.addService(HDFS);
- hdfs.persist();
- hdfs.addServiceComponent(DATANODE).persist();
- hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(DATANODE);
+ hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1);
hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1).setState(State.STARTED);
ActionQueue aq = new ActionQueue();
@@ -891,16 +881,15 @@ public class HeartbeatProcessorTest {
@Test
@SuppressWarnings("unchecked")
- public void testComponentUpgradeCompleteReport() throws AmbariException, InvalidStateTransitionException {
+ public void testComponentUpgradeCompleteReport() throws Exception {
Cluster cluster = heartbeatTestHelper.getDummyCluster();
Service hdfs = cluster.addService(HDFS);
- hdfs.persist();
- hdfs.addServiceComponent(DATANODE).persist();
- hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(NAMENODE).persist();
- hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(HDFS_CLIENT).persist();
- hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(DATANODE);
+ hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1);
+ hdfs.addServiceComponent(NAMENODE);
+ hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1);
+ hdfs.addServiceComponent(HDFS_CLIENT);
+ hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1);
ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
@@ -976,16 +965,15 @@ public class HeartbeatProcessorTest {
@Test
@SuppressWarnings("unchecked")
- public void testComponentUpgradeFailReport() throws AmbariException, InvalidStateTransitionException {
+ public void testComponentUpgradeFailReport() throws Exception {
Cluster cluster = heartbeatTestHelper.getDummyCluster();
Service hdfs = cluster.addService(HDFS);
- hdfs.persist();
- hdfs.addServiceComponent(DATANODE).persist();
- hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(NAMENODE).persist();
- hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(HDFS_CLIENT).persist();
- hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(DATANODE);
+ hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1);
+ hdfs.addServiceComponent(NAMENODE);
+ hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1);
+ hdfs.addServiceComponent(HDFS_CLIENT);
+ hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1);
ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
@@ -1097,16 +1085,15 @@ public class HeartbeatProcessorTest {
@Test
@SuppressWarnings("unchecked")
- public void testComponentUpgradeInProgressReport() throws AmbariException, InvalidStateTransitionException {
+ public void testComponentUpgradeInProgressReport() throws Exception {
Cluster cluster = heartbeatTestHelper.getDummyCluster();
Service hdfs = cluster.addService(HDFS);
- hdfs.persist();
- hdfs.addServiceComponent(DATANODE).persist();
- hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(NAMENODE).persist();
- hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(HDFS_CLIENT).persist();
- hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1).persist();
+ hdfs.addServiceComponent(DATANODE);
+ hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1);
+ hdfs.addServiceComponent(NAMENODE);
+ hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1);
+ hdfs.addServiceComponent(HDFS_CLIENT);
+ hdfs.getServiceComponent(HDFS_CLIENT).addServiceComponentHost(DummyHostname1);
ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
@@ -1307,13 +1294,12 @@ public class HeartbeatProcessorTest {
public void testComponentInProgressStatusSafeAfterStatusReport() throws Exception {
Cluster cluster = heartbeatTestHelper.getDummyCluster();
Service hdfs = cluster.addService(HDFS);
- hdfs.persist();
- hdfs.addServiceComponent(DATANODE).persist();
+ hdfs.addServiceComponent(DATANODE);
hdfs.getServiceComponent(DATANODE).
- addServiceComponentHost(DummyHostname1).persist();
- hdfs.addServiceComponent(NAMENODE).persist();
+ addServiceComponentHost(DummyHostname1);
+ hdfs.addServiceComponent(NAMENODE);
hdfs.getServiceComponent(NAMENODE).
- addServiceComponentHost(DummyHostname1).persist();
+ addServiceComponentHost(DummyHostname1);
ActionQueue aq = new ActionQueue();
http://git-wip-us.apache.org/repos/asf/ambari/blob/f64fa722/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatTestHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatTestHelper.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatTestHelper.java
index 6fc6892..43503fa 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatTestHelper.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatTestHelper.java
@@ -24,6 +24,7 @@ import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyOs;
import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyStackId;
import static org.apache.ambari.server.agent.DummyHeartbeatConstants.HBASE;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -60,6 +61,7 @@ import org.apache.ambari.server.state.ConfigFactory;
import org.apache.ambari.server.state.Host;
import org.apache.ambari.server.state.RepositoryVersionState;
import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.cluster.ClustersImpl;
import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartEvent;
@@ -135,7 +137,7 @@ public class HeartbeatTestHelper {
}
public Cluster getDummyCluster()
- throws AmbariException {
+ throws Exception {
Map<String, String> configProperties = new HashMap<String, String>() {{
put(RecoveryConfigHelper.RECOVERY_ENABLED_KEY, "true");
put(RecoveryConfigHelper.RECOVERY_TYPE_KEY, "AUTO_START");
@@ -154,7 +156,7 @@ public class HeartbeatTestHelper {
public Cluster getDummyCluster(String clusterName, String desiredStackId,
Map<String, String> configProperties, Set<String> hostNames)
- throws AmbariException {
+ throws Exception {
StackEntity stackEntity = stackDAO.find(HDP_22_STACK.getStackName(), HDP_22_STACK.getStackVersion());
org.junit.Assert.assertNotNull(stackEntity);
@@ -177,6 +179,14 @@ public class HeartbeatTestHelper {
StackId stackId = new StackId(desiredStackId);
+ // because this test method goes around the Clusters business object, we
+ // forcefully will refresh the internal state so that any tests which
+ // incorrect use Clusters after calling this won't be affected
+ Clusters clusters = injector.getInstance(Clusters.class);
+ Method method = ClustersImpl.class.getDeclaredMethod("loadClustersAndHosts");
+ method.setAccessible(true);
+ method.invoke(clusters);
+
Cluster cluster = clusters.getCluster(clusterName);
cluster.setDesiredStackVersion(stackId);
@@ -203,12 +213,12 @@ public class HeartbeatTestHelper {
clusters.addHost(hostName);
Host host = clusters.getHost(hostName);
host.setHostAttributes(hostAttributes);
- host.persist();
HostEntity hostEntity = hostDAO.findByName(hostName);
Assert.assertNotNull(hostEntity);
hostEntities.add(hostEntity);
}
+
clusterEntity.setHostEntities(hostEntities);
clusters.mapHostsToCluster(hostNames, clusterName);