You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2016/10/07 19:14:11 UTC
[14/32] ambari git commit: AMBARI-18513. Optimize
ClustersDeadlockTest (aonishuk)
AMBARI-18513. Optimize ClustersDeadlockTest (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/a901d8a7
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/a901d8a7
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/a901d8a7
Branch: refs/heads/branch-dev-patch-upgrade
Commit: a901d8a7a8f21cf9e6055a6669883abe47d5e370
Parents: 62dc775
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Wed Oct 5 14:46:46 2016 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Wed Oct 5 14:46:46 2016 +0300
----------------------------------------------------------------------
.../state/cluster/ClustersDeadlockTest.java | 137 +++++++++++++------
1 file changed, 97 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/a901d8a7/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java
index a0a6444..190f64d 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java
@@ -22,8 +22,10 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.inject.Provider;
import junit.framework.Assert;
import org.apache.ambari.server.AmbariException;
@@ -69,6 +71,9 @@ public class ClustersDeadlockTest {
private final AtomicInteger hostNameCounter = new AtomicInteger(0);
+ private CountDownLatch writerStoppedSignal;
+ private CountDownLatch readerStoppedSignal;
+
private final StackId stackId = new StackId("HDP-0.1");
@Inject
@@ -109,6 +114,9 @@ public class ClustersDeadlockTest {
// install HDFS
installService("HDFS");
+
+ writerStoppedSignal = new CountDownLatch(NUMBER_OF_THREADS);
+ readerStoppedSignal = new CountDownLatch(NUMBER_OF_THREADS);
}
@After
@@ -117,6 +125,42 @@ public class ClustersDeadlockTest {
}
/**
+ * Launches reader and writer threads simultaneously to check for a deadlock.
+ * The numbers of launched reader and writer threads are equal to
+ * the {@code}numberOfThreads{@code}. This method expects that reader
+ * and writer threads are using {@code}readerStoppedSignal{@code}
+ * and {@code}writerStoppedSignal{@code} correctly.
+ *
+ * Reader threads should be stopped after writer threads are finished.
+ */
+ private void doLoadTest(Provider<? extends Thread> readerProvider,
+ Provider<? extends Thread> writerProvider,
+ final int numberOfThreads,
+ CountDownLatch writerStoppedSignal,
+ CountDownLatch readerStoppedSignal) throws Exception {
+ List<Thread> writerThreads = new ArrayList<Thread>();
+ for (int i = 0; i < numberOfThreads; i++) {
+ Thread readerThread = readerProvider.get();
+ Thread writerThread = writerProvider.get();
+
+ writerThreads.add(writerThread);
+
+ readerThread.start();
+ writerThread.start();
+ }
+
+ for (Thread writerThread : writerThreads) {
+ writerThread.join();
+ // Notify that one writer thread is stopped
+ writerStoppedSignal.countDown();
+ }
+
+ // All writer threads are stopped. Reader threads should finish now.
+ // Await for all reader threads to stop
+ readerStoppedSignal.await();
+ }
+
+ /**
* Tests that no deadlock exists when adding hosts while reading from the
* cluster.
*
@@ -124,21 +168,20 @@ public class ClustersDeadlockTest {
*/
@Test(timeout = 40000)
public void testDeadlockWhileMappingHosts() throws Exception {
- List<Thread> threads = new ArrayList<Thread>();
- for (int i = 0; i < NUMBER_OF_THREADS; i++) {
- ClusterReaderThread readerThread = new ClusterReaderThread();
- ClustersHostMapperThread writerThread = new ClustersHostMapperThread();
-
- threads.add(readerThread);
- threads.add(writerThread);
+ Provider<ClustersHostMapperThread> clustersHostMapperThreadFactory =
+ new Provider<ClustersHostMapperThread>() {
- readerThread.start();
- writerThread.start();
- }
+ @Override
+ public ClustersHostMapperThread get() {
+ return new ClustersHostMapperThread();
+ }
+ };
- for (Thread thread : threads) {
- thread.join();
- }
+ doLoadTest(new ClusterReaderThreadFactory(),
+ clustersHostMapperThreadFactory,
+ NUMBER_OF_THREADS,
+ writerStoppedSignal,
+ readerStoppedSignal);
Assert.assertEquals(NUMBER_OF_THREADS * NUMBER_OF_HOSTS,
clusters.getHostsForCluster(CLUSTER_NAME).size());
@@ -154,21 +197,20 @@ public class ClustersDeadlockTest {
@Test(timeout = 40000)
public void testDeadlockWhileMappingHostsWithExistingServices()
throws Exception {
- List<Thread> threads = new ArrayList<Thread>();
- for (int i = 0; i < NUMBER_OF_THREADS; i++) {
- ClusterReaderThread readerThread = new ClusterReaderThread();
- ClustersHostAndComponentMapperThread writerThread = new ClustersHostAndComponentMapperThread();
-
- threads.add(readerThread);
- threads.add(writerThread);
+ Provider<ClustersHostAndComponentMapperThread> clustersHostAndComponentMapperThreadFactory =
+ new Provider<ClustersHostAndComponentMapperThread>() {
- readerThread.start();
- writerThread.start();
- }
+ @Override
+ public ClustersHostAndComponentMapperThread get() {
+ return new ClustersHostAndComponentMapperThread();
+ }
+ };
- for (Thread thread : threads) {
- thread.join();
- }
+ doLoadTest(new ClusterReaderThreadFactory(),
+ clustersHostAndComponentMapperThreadFactory,
+ NUMBER_OF_THREADS,
+ writerStoppedSignal,
+ readerStoppedSignal);
}
/**
@@ -179,26 +221,33 @@ public class ClustersDeadlockTest {
*/
@Test(timeout = 40000)
public void testDeadlockWhileUnmappingHosts() throws Exception {
- List<Thread> threads = new ArrayList<Thread>();
- for (int i = 0; i < NUMBER_OF_THREADS; i++) {
- ClusterReaderThread readerThread = new ClusterReaderThread();
- ClustersHostUnMapperThread writerThread = new ClustersHostUnMapperThread();
-
- threads.add(readerThread);
- threads.add(writerThread);
+ Provider<ClustersHostUnMapperThread> clustersHostUnMapperThreadFactory =
+ new Provider<ClustersHostUnMapperThread>() {
- readerThread.start();
- writerThread.start();
- }
+ @Override
+ public ClustersHostUnMapperThread get() {
+ return new ClustersHostUnMapperThread();
+ }
+ };
- for (Thread thread : threads) {
- thread.join();
- }
+ doLoadTest(new ClusterReaderThreadFactory(),
+ clustersHostUnMapperThreadFactory,
+ NUMBER_OF_THREADS,
+ writerStoppedSignal,
+ readerStoppedSignal);
Assert.assertEquals(0,
clusters.getHostsForCluster(CLUSTER_NAME).size());
}
+ private final class ClusterReaderThreadFactory implements Provider<ClusterReaderThread> {
+
+ @Override
+ public ClusterReaderThread get() {
+ return new ClusterReaderThread();
+ }
+ }
+
/**
* The {@link ClusterReaderThread} reads from a cluster over and over again
* with a slight pause.
@@ -211,12 +260,20 @@ public class ClustersDeadlockTest {
@Override
public void run() {
try {
- for (int i = 0; i < 1000; i++) {
+ // Repeat until writer threads exist
+ while (true) {
+ if (writerStoppedSignal.getCount() == 0) {
+ break;
+ }
+
cluster.convertToResponse();
Thread.sleep(10);
}
} catch (Exception exception) {
throw new RuntimeException(exception);
+ } finally {
+ // Notify that one reader was stopped
+ readerStoppedSignal.countDown();
}
}
}