You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2015/06/20 20:06:43 UTC
[2/2] ambari git commit: AMBARI-12026. During rolling upgrade proces,
Ambari becomes unresponsive. (mpapirkovskyy)
AMBARI-12026. During rolling upgrade proces, Ambari becomes unresponsive. (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e35b02e7
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e35b02e7
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e35b02e7
Branch: refs/heads/trunk
Commit: e35b02e7d20e8831ea41cb5a672185b9cbb3c9d9
Parents: b871984
Author: Myroslav Papirkovskyy <mp...@hortonworks.com>
Authored: Fri Jun 19 22:07:12 2015 +0300
Committer: Myroslav Papirkovskyy <mp...@hortonworks.com>
Committed: Sat Jun 20 15:15:24 2015 +0300
----------------------------------------------------------------------
.../apache/ambari/server/state/ConfigImpl.java | 292 +++++++++++++------
.../state/cluster/ClusterDeadlockTest.java | 105 ++++++-
.../server/testing/DeadlockWarningThread.java | 7 +-
3 files changed, 300 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/e35b02e7/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java
index 71604c1..912209d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java
@@ -23,6 +23,8 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ambari.server.orm.dao.ClusterDAO;
import org.apache.ambari.server.orm.dao.ServiceConfigDAO;
@@ -46,11 +48,13 @@ public class ConfigImpl implements Config {
public static final String GENERATED_TAG_PREFIX = "generatedTag_";
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
private Cluster cluster;
private StackId stackId;
private String type;
- private String tag;
- private Long version;
+ private volatile String tag;
+ private volatile Long version;
private Map<String, String> properties;
private Map<String, Map<String, String>> propertiesAttributes;
private ClusterConfigEntity entity;
@@ -91,6 +95,9 @@ public class ConfigImpl implements Config {
stackId = new StackId(entity.getStack());
this.entity = entity;
+ propertiesAttributes = gson.<Map<String, Map<String, String>>>fromJson(entity.getAttributes(), Map.class);
+ properties = gson.<Map<String, String>>fromJson(entity.getData(), Map.class);
+
injector.injectMembers(this);
}
@@ -105,93 +112,186 @@ public class ConfigImpl implements Config {
* {@inheritDoc}
*/
@Override
- public synchronized StackId getStackId() {
- return stackId;
+ public StackId getStackId() {
+ readWriteLock.readLock().lock();
+ try {
+ return stackId;
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+
}
@Override
- public synchronized void setStackId(StackId stackId) {
- this.stackId = stackId;
+ public void setStackId(StackId stackId) {
+ readWriteLock.writeLock().lock();
+ try {
+ this.stackId = stackId;
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+
}
@Override
public String getType() {
- return type;
+ readWriteLock.readLock().lock();
+ try {
+ return type;
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+
}
@Override
- public synchronized String getTag() {
+ public String getTag() {
if (tag == null) {
- tag = GENERATED_TAG_PREFIX + getVersion();
+ readWriteLock.writeLock().lock();
+ try {
+ if (tag == null) {
+ tag = GENERATED_TAG_PREFIX + getVersion();
+ }
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+
+ readWriteLock.readLock().lock();
+ try {
+
+ return tag;
+ } finally {
+ readWriteLock.readLock().unlock();
}
- return tag;
+
}
@Override
- public synchronized Long getVersion() {
+ public Long getVersion() {
if (version == null && cluster != null) {
- version = cluster.getNextConfigVersion(type);
+ readWriteLock.writeLock().lock();
+ try {
+ if (version == null) {
+ version = cluster.getNextConfigVersion(type); //pure DB calculation call, no cluster locking required
+ }
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
}
- return version;
- }
- @Override
- public synchronized Map<String, String> getProperties() {
- if (null != entity && null == properties) {
+ readWriteLock.readLock().lock();
+ try {
+ return version;
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
- properties = gson.<Map<String, String>>fromJson(entity.getData(), Map.class);
+ }
+ @Override
+ public Map<String, String> getProperties() {
+ readWriteLock.readLock().lock();
+ try {
+ return null == properties ? new HashMap<String, String>()
+ : new HashMap<String, String>(properties);
+ } finally {
+ readWriteLock.readLock().unlock();
}
- return null == properties ? new HashMap<String, String>()
- : new HashMap<String, String>(properties);
+
}
@Override
- public synchronized Map<String, Map<String, String>> getPropertiesAttributes() {
- if (null != entity && null == propertiesAttributes) {
- propertiesAttributes = gson.<Map<String, Map<String, String>>>fromJson(entity.getAttributes(), Map.class);
+ public Map<String, Map<String, String>> getPropertiesAttributes() {
+ readWriteLock.readLock().lock();
+ try {
+ return null == propertiesAttributes ? null : new HashMap<String, Map<String, String>>(propertiesAttributes);
+ } finally {
+ readWriteLock.readLock().unlock();
}
- return null == propertiesAttributes ? null : new HashMap<String, Map<String, String>>(propertiesAttributes);
+
}
@Override
- public synchronized void setTag(String tag) {
- this.tag = tag;
+ public void setTag(String tag) {
+ readWriteLock.writeLock().lock();
+ try {
+ this.tag = tag;
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+
}
@Override
- public synchronized void setVersion(Long version) {
- this.version = version;
+ public void setVersion(Long version) {
+ readWriteLock.writeLock().lock();
+ try {
+ this.version = version;
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+
}
@Override
- public synchronized void setProperties(Map<String, String> properties) {
- this.properties = properties;
+ public void setProperties(Map<String, String> properties) {
+ readWriteLock.writeLock().lock();
+ try {
+ this.properties = properties;
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+
}
@Override
public void setPropertiesAttributes(Map<String, Map<String, String>> propertiesAttributes) {
- this.propertiesAttributes = propertiesAttributes;
+ readWriteLock.writeLock().lock();
+ try {
+ this.propertiesAttributes = propertiesAttributes;
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+
}
@Override
- public synchronized void updateProperties(Map<String, String> properties) {
- this.properties.putAll(properties);
+ public void updateProperties(Map<String, String> properties) {
+ readWriteLock.writeLock().lock();
+ try {
+ this.properties.putAll(properties);
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+
}
@Override
- public synchronized List<Long> getServiceConfigVersions() {
- if (cluster == null || type == null || version == null) {
- return Collections.emptyList();
+ public List<Long> getServiceConfigVersions() {
+ readWriteLock.readLock().lock();
+ try {
+ if (cluster == null || type == null || version == null) {
+ return Collections.emptyList();
+ }
+ return serviceConfigDAO.getServiceConfigVersionsByConfig(cluster.getClusterId(), type, version);
+ } finally {
+ readWriteLock.readLock().unlock();
}
- return serviceConfigDAO.getServiceConfigVersionsByConfig(cluster.getClusterId(), type, version);
+
}
@Override
- public synchronized void deleteProperties(List<String> properties) {
- for (String key : properties) {
- this.properties.remove(key);
+ public void deleteProperties(List<String> properties) {
+ readWriteLock.writeLock().lock();
+ try {
+ for (String key : properties) {
+ this.properties.remove(key);
+ }
+ } finally {
+ readWriteLock.writeLock().unlock();
}
+
}
@Override
@@ -201,58 +301,70 @@ public class ConfigImpl implements Config {
@Override
@Transactional
- public synchronized void persist(boolean newConfig) {
- ClusterEntity clusterEntity = clusterDAO.findById(cluster.getClusterId());
-
- if (newConfig) {
- ClusterConfigEntity entity = new ClusterConfigEntity();
- entity.setClusterEntity(clusterEntity);
- entity.setClusterId(cluster.getClusterId());
- entity.setType(getType());
- entity.setVersion(getVersion());
- entity.setTag(getTag());
- entity.setTimestamp(new Date().getTime());
- entity.setStack(clusterEntity.getDesiredStack());
- entity.setData(gson.toJson(getProperties()));
-
- if (null != getPropertiesAttributes()) {
- entity.setAttributes(gson.toJson(getPropertiesAttributes()));
- }
-
- clusterDAO.createConfig(entity);
- clusterEntity.getClusterConfigEntities().add(entity);
-
- // save the entity, forcing a flush to ensure the refresh picks up the
- // newest data
- clusterDAO.merge(clusterEntity, true);
- cluster.refresh();
- } else {
- // only supporting changes to the properties
- ClusterConfigEntity entity = null;
-
- // find the existing configuration to update
- for (ClusterConfigEntity cfe : clusterEntity.getClusterConfigEntities()) {
- if (getTag().equals(cfe.getTag()) &&
- getType().equals(cfe.getType()) &&
- getVersion().equals(cfe.getVersion())) {
- entity = cfe;
- break;
+ public void persist(boolean newConfig) {
+ cluster.getClusterGlobalLock().writeLock().lock(); //null cluster is not expected, NPE anyway later in code
+ try {
+ readWriteLock.writeLock().lock();
+ try {
+ ClusterEntity clusterEntity = clusterDAO.findById(cluster.getClusterId());
+
+ if (newConfig) {
+ ClusterConfigEntity entity = new ClusterConfigEntity();
+ entity.setClusterEntity(clusterEntity);
+ entity.setClusterId(cluster.getClusterId());
+ entity.setType(getType());
+ entity.setVersion(getVersion());
+ entity.setTag(getTag());
+ entity.setTimestamp(new Date().getTime());
+ entity.setStack(clusterEntity.getDesiredStack());
+ entity.setData(gson.toJson(getProperties()));
+
+ if (null != getPropertiesAttributes()) {
+ entity.setAttributes(gson.toJson(getPropertiesAttributes()));
+ }
+
+ clusterDAO.createConfig(entity);
+ clusterEntity.getClusterConfigEntities().add(entity);
+
+ // save the entity, forcing a flush to ensure the refresh picks up the
+ // newest data
+ clusterDAO.merge(clusterEntity, true);
+ cluster.refresh();
+ } else {
+ // only supporting changes to the properties
+ ClusterConfigEntity entity = null;
+
+ // find the existing configuration to update
+ for (ClusterConfigEntity cfe : clusterEntity.getClusterConfigEntities()) {
+ if (getTag().equals(cfe.getTag()) &&
+ getType().equals(cfe.getType()) &&
+ getVersion().equals(cfe.getVersion())) {
+ entity = cfe;
+ break;
+ }
+ }
+
+ // if the configuration was found, then update it
+ if (null != entity) {
+ LOG.debug(
+ "Updating {} version {} with new configurations; a new version will not be created",
+ getType(), getVersion());
+
+ entity.setData(gson.toJson(getProperties()));
+
+ // save the entity, forcing a flush to ensure the refresh picks up the
+ // newest data
+ clusterDAO.merge(clusterEntity, true);
+ cluster.refresh();
+ }
}
+ } finally {
+ readWriteLock.writeLock().unlock();
}
-
- // if the configuration was found, then update it
- if (null != entity) {
- LOG.debug(
- "Updating {} version {} with new configurations; a new version will not be created",
- getType(), getVersion());
-
- entity.setData(gson.toJson(getProperties()));
-
- // save the entity, forcing a flush to ensure the refresh picks up the
- // newest data
- clusterDAO.merge(clusterEntity, true);
- cluster.refresh();
- }
+ } finally {
+ cluster.getClusterGlobalLock().writeLock().unlock();
}
+
}
+
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e35b02e7/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java
index 08f9743..847de7d 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java
@@ -18,9 +18,27 @@
package org.apache.ambari.server.state.cluster;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.ConfigFactory;
+import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.ambari.server.state.RepositoryVersionState;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponent;
+import org.apache.ambari.server.state.ServiceComponentFactory;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.ServiceComponentHostFactory;
+import org.apache.ambari.server.state.ServiceFactory;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.State;
import org.apache.ambari.server.testing.DeadlockWarningThread;
+
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@@ -32,19 +50,6 @@ import org.apache.ambari.server.events.listeners.upgrade.HostVersionOutOfSyncLis
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
import org.apache.ambari.server.orm.OrmTestHelper;
-import org.apache.ambari.server.state.Cluster;
-import org.apache.ambari.server.state.Clusters;
-import org.apache.ambari.server.state.Host;
-import org.apache.ambari.server.state.MaintenanceState;
-import org.apache.ambari.server.state.RepositoryVersionState;
-import org.apache.ambari.server.state.Service;
-import org.apache.ambari.server.state.ServiceComponent;
-import org.apache.ambari.server.state.ServiceComponentFactory;
-import org.apache.ambari.server.state.ServiceComponentHost;
-import org.apache.ambari.server.state.ServiceComponentHostFactory;
-import org.apache.ambari.server.state.ServiceFactory;
-import org.apache.ambari.server.state.StackId;
-import org.apache.ambari.server.state.State;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
@@ -85,6 +90,9 @@ public class ClusterDeadlockTest {
private ServiceComponentHostFactory serviceComponentHostFactory;
@Inject
+ private ConfigFactory configFactory;
+
+ @Inject
private OrmTestHelper helper;
private StackId stackId = new StackId("HDP-0.1");
@@ -117,6 +125,16 @@ public class ClusterDeadlockTest {
cluster.createClusterVersion(stackId,
stackId.getStackVersion(), "admin", RepositoryVersionState.UPGRADING);
+ Config config1 = configFactory.createNew(cluster, "test-type1", new HashMap<String, String>(), new HashMap<String,
+ Map<String, String>>());
+ Config config2 = configFactory.createNew(cluster, "test-type2", new HashMap<String, String>(), new HashMap<String,
+ Map<String, String>>());
+ config1.persist();
+ config2.persist();
+ cluster.addConfig(config1);
+ cluster.addConfig(config2);
+ cluster.addDesiredConfig("test user", new HashSet<Config>(Arrays.asList(config1, config2)));
+
// 100 hosts
for (int i = 0; i < NUMBER_OF_HOSTS; i++) {
String hostName = "c64-" + i;
@@ -270,6 +288,67 @@ public class ClusterDeadlockTest {
}
}
+ @Test
+ public void testDeadlockWithConfigsUpdate() throws Exception {
+ List<Thread> threads = new ArrayList<Thread>();
+ for (int i = 0; i < NUMBER_OF_THREADS; i++) {
+ ClusterDesiredConfigsReaderThread readerThread = null;
+ for (int j = 0; j < NUMBER_OF_THREADS; j++) {
+ readerThread = new ClusterDesiredConfigsReaderThread();
+ threads.add(readerThread);
+ }
+ for (Config config : cluster.getAllConfigs()) {
+ ConfigUpdaterThread configUpdaterThread = new ConfigUpdaterThread(config);
+ threads.add(configUpdaterThread);
+ }
+
+ }
+
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ DeadlockWarningThread wt = new DeadlockWarningThread(threads);
+
+ while (true) {
+ if(!wt.isAlive()) {
+ break;
+ }
+ }
+ if (wt.isDeadlocked()){
+ Assert.assertFalse(wt.getErrorMessages().toString(), wt.isDeadlocked());
+ } else {
+ Assert.assertFalse(wt.isDeadlocked());
+ }
+
+
+ }
+
+
+ private final class ClusterDesiredConfigsReaderThread extends Thread {
+ @Override
+ public void run() {
+ for (int i =0; i<1500; i++) {
+ cluster.getDesiredConfigs();
+ }
+ }
+ }
+
+ private final class ConfigUpdaterThread extends Thread {
+ private Config config;
+
+ public ConfigUpdaterThread(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public void run() {
+ for (int i =0; i<500; i++) {
+ config.persist(false);
+ }
+ }
+ }
+
/**
* The {@link ClusterReaderThread} reads from a cluster over and over again
* with a slight pause.
http://git-wip-us.apache.org/repos/asf/ambari/blob/e35b02e7/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockWarningThread.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockWarningThread.java b/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockWarningThread.java
index b1237df..b18638a 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockWarningThread.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockWarningThread.java
@@ -95,7 +95,12 @@ public class DeadlockWarningThread extends Thread {
Set<Thread> activeThreads = new HashSet<Thread>();
for (Thread monTh : monitoredThreads) {
ThreadGroup group = monTh.getThreadGroup();
- Thread[] groupThreads = new Thread[group.activeCount()];
+ if (group == null) {
+ //expected if thread died, ignore it
+ continue;
+ }
+ int activeCount = group.activeCount();
+ Thread[] groupThreads = new Thread[activeCount];
group.enumerate(groupThreads, true);
activeThreads.addAll(Arrays.asList(groupThreads));
}