You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by vb...@apache.org on 2015/06/11 14:12:06 UTC

ambari git commit: AMBARI-11692. ClusterDeadlockTest unit test fails.(vbrodetskyi)

Repository: ambari
Updated Branches:
  refs/heads/trunk ed7a08434 -> 3dd9ae471


AMBARI-11692. ClusterDeadlockTest unit test fails.(vbrodetskyi)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/3dd9ae47
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/3dd9ae47
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/3dd9ae47

Branch: refs/heads/trunk
Commit: 3dd9ae471abfd6df36bf6fe902c3de9c81c57579
Parents: ed7a084
Author: Vitaly Brodetskyi <vb...@hortonworks.com>
Authored: Thu Jun 11 06:49:22 2015 +0300
Committer: Vitaly Brodetskyi <vb...@hortonworks.com>
Committed: Thu Jun 11 06:49:22 2015 +0300

----------------------------------------------------------------------
 .../state/cluster/ClusterDeadlockTest.java      |  48 ++++-
 .../server/testing/DeadlockWarningThread.java   | 135 +++++++++++++
 .../server/testing/DeadlockedThreadsTest.java   | 188 +++++++++++++++++++
 3 files changed, 361 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/3dd9ae47/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java
index 2f064ab..08f9743 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.ambari.server.state.cluster;
 
+import org.apache.ambari.server.testing.DeadlockWarningThread;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -143,7 +144,7 @@ public class ClusterDeadlockTest {
    *
    * @throws Exception
    */
-  @Test(timeout = 30000)
+  @Test()
   public void testDeadlockBetweenImplementations() throws Exception {
     Service service = cluster.getService("HDFS");
     ServiceComponent nameNodeComponent = service.getServiceComponent("NAMENODE");
@@ -168,8 +169,17 @@ public class ClusterDeadlockTest {
       threads.add(thread);
     }
 
-    for (Thread thread : threads) {
-      thread.join();
+    DeadlockWarningThread wt = new DeadlockWarningThread(threads);
+    
+    while (true) {
+      if(!wt.isAlive()) {
+          break;
+      }
+    }
+    if (wt.isDeadlocked()){
+      Assert.assertFalse(wt.getErrorMessages().toString(), wt.isDeadlocked());
+    } else {
+      Assert.assertFalse(wt.isDeadlocked());
     }
   }
 
@@ -179,7 +189,7 @@ public class ClusterDeadlockTest {
    *
    * @throws Exception
    */
-  @Test(timeout = 35000)
+  @Test()
   public void testAddingHostComponentsWhileReading() throws Exception {
     Service service = cluster.getService("HDFS");
     ServiceComponent nameNodeComponent = service.getServiceComponent("NAMENODE");
@@ -194,8 +204,17 @@ public class ClusterDeadlockTest {
       threads.add(thread);
     }
 
-    for (Thread thread : threads) {
-      thread.join();
+    DeadlockWarningThread wt = new DeadlockWarningThread(threads);
+    
+    while (true) {
+      if(!wt.isAlive()) {
+          break;
+      }
+    }
+    if (wt.isDeadlocked()){
+      Assert.assertFalse(wt.getErrorMessages().toString(), wt.isDeadlocked());
+    } else {
+      Assert.assertFalse(wt.isDeadlocked());
     }
   }
 
@@ -205,7 +224,7 @@ public class ClusterDeadlockTest {
    *
    * @throws Exception
    */
-  @Test(timeout = 75000)
+  @Test()
   public void testDeadlockWhileRestartingComponents() throws Exception {
     // for each host, install both components
     List<ServiceComponentHost> serviceComponentHosts = new ArrayList<ServiceComponentHost>();
@@ -236,9 +255,18 @@ public class ClusterDeadlockTest {
       clusterWriterThread.start();
       schWriterThread.start();
     }
-
-    for (Thread thread : threads) {
-      thread.join();
+    
+    DeadlockWarningThread wt = new DeadlockWarningThread(threads);
+    
+    while (true) {
+      if(!wt.isAlive()) {
+          break;
+      }
+    }
+    if (wt.isDeadlocked()){
+      Assert.assertFalse(wt.getErrorMessages().toString(), wt.isDeadlocked());
+    } else {
+      Assert.assertFalse(wt.isDeadlocked());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/3dd9ae47/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockWarningThread.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockWarningThread.java b/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockWarningThread.java
new file mode 100644
index 0000000..b1237df
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockWarningThread.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+* http://www.apache.org/licenses/LICENSE-2.0
+ * 
+* Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.ambari.server.testing;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * 
+ * Monitoring of deadlocks thread
+ * Please note. This class can not be used outside of tests
+ */
+public class DeadlockWarningThread extends Thread {
+
+  private Thread parentThread;
+  private final List<String> errorMessages;
+  private static final int MAX_STACK_DEPTH = 30;
+  private Collection<Thread> monitoredThreads = null;
+  private boolean deadlocked = false;
+  private static final ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
+  private String stacktrace = "";
+
+  public List<String> getErrorMessages() {
+    return errorMessages;
+  }
+
+  public boolean isDeadlocked() {
+    return deadlocked;
+  }
+
+  public DeadlockWarningThread(Collection<Thread> monitoredThreads) {
+    this.errorMessages = new ArrayList<String>();
+    this.monitoredThreads = monitoredThreads;
+    parentThread = Thread.currentThread();
+    start();
+  }
+
+  public String getThreadsStacktraces(long[] ids) {
+    StringBuilder errBuilder = new StringBuilder();
+      for (long id : ids) {
+        ThreadInfo ti = mbean.getThreadInfo(id, MAX_STACK_DEPTH);
+        errBuilder.append("Deadlocked Thread:\n").
+                append("------------------\n").
+                append(ti).append('\n');
+        for (StackTraceElement ste : ti.getStackTrace()) {
+          errBuilder.append('\t').append(ste);
+        }
+        errBuilder.append('\n');
+      }
+    return errBuilder.toString();
+  }
+ 
+  
+  @Override
+  public void run() {
+    while (true) {
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException ex) {
+      }
+      long[] ids = mbean.findMonitorDeadlockedThreads();
+      StringBuilder errBuilder = new StringBuilder();
+      if (ids != null && ids.length > 0) {
+          errBuilder.append(getThreadsStacktraces(ids));
+          errorMessages.add(errBuilder.toString());
+          System.out.append(errBuilder.toString());
+         //Exit if deadlocks have been found         
+          deadlocked = true;
+          break;
+      } else {
+        //Exit if all monitored threads were finished
+        boolean hasLive = false;
+        Set<Thread> activeThreads = new HashSet<Thread>();
+        for (Thread monTh : monitoredThreads) {
+          ThreadGroup group = monTh.getThreadGroup();
+          Thread[] groupThreads = new Thread[group.activeCount()];
+          group.enumerate(groupThreads, true);
+          activeThreads.addAll(Arrays.asList(groupThreads));
+        }
+        activeThreads.remove(Thread.currentThread());
+        activeThreads.remove(parentThread);
+        Set<Long> idSet = new TreeSet<Long>();
+        for (Thread activeThread : activeThreads) {
+          if (activeThread.isAlive()) {
+            hasLive = true;
+            idSet.add(activeThread.getId());
+          }     
+        }
+        long[] tid = new long[idSet.size()];
+        if (!hasLive) {
+          deadlocked = false;
+          break;
+        } else {
+          int cnt = 0;
+          for (Long id : idSet) {
+            tid[cnt] = id;
+            cnt++;
+          }
+          String currentStackTrace = getThreadsStacktraces(tid);
+          if (stacktrace.equals(currentStackTrace)) {
+            errBuilder.append(currentStackTrace);
+            errorMessages.add(currentStackTrace);
+            System.out.append(currentStackTrace);
+            deadlocked = true;
+            break;            
+          } else {
+            stacktrace = currentStackTrace;
+          }
+        }
+      }
+    }
+  }  
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/3dd9ae47/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockedThreadsTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockedThreadsTest.java b/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockedThreadsTest.java
new file mode 100644
index 0000000..6ce3acc
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/testing/DeadlockedThreadsTest.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+* http://www.apache.org/licenses/LICENSE-2.0
+ * 
+* Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.ambari.server.testing;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ * Test if DeadlockWarningThread can detect deadlocks properly
+ */
+public class DeadlockedThreadsTest {
+  static Set<Thread> threads = new HashSet<Thread>();
+  
+  /**
+   *
+   * Test should detect "flat" deadlock
+   * This test commented because it is not testing any production code
+   * In case if we change DeadlockWarningThread and need test of changes
+   * we can add there @Test annotation
+   */
+  public void testDeadlocks() {
+    
+    // deadlock with three locks
+    Object lock1 = new String("lock1");
+    Object lock2 = new String("lock2");
+    Object lock3 = new String("lock3");
+
+    
+    threads.add(new DeadlockingThread("t1", lock1, lock2));
+    threads.add(new DeadlockingThread("t2", lock2, lock3));
+    threads.add(new DeadlockingThread("t3", lock3, lock1));
+
+    // deadlock with two locks
+    Object lock4 = new String("lock4");
+    Object lock5 = new String("lock5");
+
+    threads.add(new DeadlockingThread("t4", lock4, lock5));
+    threads.add(new DeadlockingThread("t5", lock5, lock4));
+    DeadlockWarningThread wt = new DeadlockWarningThread(threads);
+
+    
+
+    while (true) {
+      if(!wt.isAlive()) {
+          break;
+      }
+    }
+    if (wt.isDeadlocked()){
+      Assert.assertTrue(wt.getErrorMessages().toString(), wt.isDeadlocked());
+      Assert.assertFalse(wt.getErrorMessages().toString().equals(""));
+    } else {
+      Assert.assertTrue(wt.getErrorMessages().toString(), wt.isDeadlocked());
+    }
+    
+  }
+
+  /**
+   *
+   * Test should detect "hidden" deadlock
+   * This test commented because it is not testing any production code
+   * In case if we change DeadlockWarningThread and need test of changes
+   * we can add there @Test annotation
+   */
+  public void testReadWriteDeadlocks() {
+    
+    // deadlock with three locks
+    Object lock1 = new String("lock1");
+    Object lock2 = new String("lock2");
+    Object lock3 = new String("lock3");
+
+    
+    threads.add(new DeadlockingThreadReadWriteLock("t1", lock1, lock2));
+    threads.add(new DeadlockingThreadReadWriteLock("t2", lock2, lock3));
+    threads.add(new DeadlockingThreadReadWriteLock("t3", lock3, lock1));
+
+    // deadlock with two locks
+    Object lock4 = new String("lock4");
+    Object lock5 = new String("lock5");
+
+    threads.add(new DeadlockingThreadReadWriteLock("t4", lock4, lock5));
+    threads.add(new DeadlockingThreadReadWriteLock("t5", lock5, lock4));
+    DeadlockWarningThread wt = new DeadlockWarningThread(threads);
+
+    
+
+    while (true) {
+      if(!wt.isAlive()) {
+          break;
+      }
+    }
+    if (wt.isDeadlocked()){
+      Assert.assertTrue(wt.getErrorMessages().toString(), wt.isDeadlocked());
+      Assert.assertFalse(wt.getErrorMessages().toString().equals(""));
+    } else {
+      Assert.assertTrue(wt.getErrorMessages().toString(), wt.isDeadlocked());
+    }
+    
+  }
+  
+  
+  /**
+   * There is absolutely nothing you can do when you have
+   * deadlocked threads.  You cannot stop them, you cannot
+  * interrupt them, you cannot tell them to stop trying to
+  * get a lock, and you also cannot tell them to let go of
+  * the locks that they own.
+  */
+  private static class DeadlockingThread extends Thread {
+    private final Object lock1;
+    private final Object lock2;
+
+    public DeadlockingThread(String name, Object lock1, Object lock2) {
+      super(name);
+      this.lock1 = lock1;
+      this.lock2 = lock2;
+      start();
+    }
+    public void run() {
+      while (true) {
+        f();
+      }
+    }
+    private void f() {
+      synchronized (lock1) {
+        g();
+      }
+    }
+    private void g() {
+      synchronized (lock2) {
+        // do some work...
+        for (int i = 0; i < 1000 * 1000; i++) ;
+      }
+    }
+  }
+  
+  private static class DeadlockingThreadReadWriteLock extends Thread {
+    private final Object lock1;
+    private final Object lock2;
+    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
+    public final Lock r = rwl.readLock();
+    public final Lock w = rwl.writeLock();
+
+    public DeadlockingThreadReadWriteLock(String name, Object lock1, Object lock2) {
+      super(name);
+      this.lock1 = lock1;
+      this.lock2 = lock2;
+      start();
+    }
+    public void run() {
+      while (true) {
+        f();
+      }
+    }
+    private void f() {
+      w.lock(); {
+        g();
+      } w.unlock();
+    }
+    
+    private void g() {
+      r.lock(); {
+        // do some work...
+        for (int i = 0; i < 1000 * 1000; i++) ;
+      } r.unlock();
+    }
+  }
+  
+}
\ No newline at end of file