You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dj...@apache.org on 2020/04/18 17:10:55 UTC

[cassandra] branch trunk updated: Fix flaky SEPExecutor.changingMaxWorkersMeetsConcurrencyGoalsTest

This is an automated email from the ASF dual-hosted git repository.

djoshi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f9ddaf1  Fix flaky SEPExecutor.changingMaxWorkersMeetsConcurrencyGoalsTest
f9ddaf1 is described below

commit f9ddaf1841147fc284e802739ca42403aa2816ae
Author: Jon Meredith <jm...@apple.com>
AuthorDate: Thu Apr 9 16:59:53 2020 -0600

    Fix flaky SEPExecutor.changingMaxWorkersMeetsConcurrencyGoalsTest
    
    Thread scheduling is not guaranteed to be fair and having the BusyWork
    tasks reschedule itself makes sure there is always more work for
    the SEPWorker once it finishes, so it can hog all the CPU if
    run with a low number of cores.  To randomize the scheduling better,
    introduce a second thread that keeps the executor primed with work,
    but guarantees a thread switch by waiting on the sempahore.
    
    Also resolves a cleanup bug - the sharedPool was not being shutdown
    correctly.
    
    Patch by Jon Meredith; reviewed by David Capwell and Dinesh Joshi for CASSANDRA-15709
---
 .../cassandra/concurrent/SEPExecutorTest.java      | 85 +++++++++++++---------
 1 file changed, 52 insertions(+), 33 deletions(-)

diff --git a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
index e3d8556..9a2d52d 100644
--- a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
+++ b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
@@ -23,7 +23,9 @@ import java.io.PrintStream;
 import java.util.Arrays;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -32,6 +34,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.utils.FBUtilities;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.MINUTES;
 
 public class SEPExecutorTest
@@ -75,48 +78,68 @@ public class SEPExecutorTest
     }
 
     @Test
-    public void changingMaxWorkersMeetsConcurrencyGoalsTest() throws InterruptedException
+    public void changingMaxWorkersMeetsConcurrencyGoalsTest() throws InterruptedException, TimeoutException
     {
-        final int numBusyWorkers = 2; // Number of busy worker threads to run and gum things up
+        // Number of busy worker threads to run and gum things up. Chosen to be
+        // between the low and high max pool size so the test exercises resizing
+        // under a number of different conditions.
+        final int numBusyWorkers = 2;
         SharedExecutorPool sharedPool = new SharedExecutorPool("ChangingMaxWorkersMeetsConcurrencyGoalsTest");
         final AtomicInteger notifiedMaxPoolSize = new AtomicInteger();
 
         LocalAwareExecutorService executor = sharedPool.newExecutor(0, notifiedMaxPoolSize::set, 4, "internal", "resizetest");
 
+        // Keep feeding the executor work while resizing
+        // so it stays under load.
         AtomicBoolean stayBusy = new AtomicBoolean(true);
-        for (int i = 0; i < numBusyWorkers; i++)
-        {
-            executor.execute(new BusyWork(executor, stayBusy));
-        }
+        Semaphore busyWorkerPermits = new Semaphore(numBusyWorkers);
+        Thread makeBusy = new Thread(() -> {
+            while (stayBusy.get() == true)
+            {
+                try
+                {
+                    if (busyWorkerPermits.tryAcquire(1, MILLISECONDS)) {
+                        executor.execute(new BusyWork(busyWorkerPermits));
+                    }
+                }
+                catch (InterruptedException e)
+                {
+                    // ignore, will either stop looping if done or retry the lock
+                }
+            }
+        });
 
-        final int previousConcurrency = executor.getMaximumPoolSize();
+        makeBusy.start();
         try
         {
-            assertMaxTaskConcurrency(executor, 1);
-            Assert.assertEquals(1, notifiedMaxPoolSize.get());
+            for (int repeat = 0; repeat < 1000; repeat++)
+            {
+                assertMaxTaskConcurrency(executor, 1);
+                Assert.assertEquals(1, notifiedMaxPoolSize.get());
 
-            assertMaxTaskConcurrency(executor, 2);
-            Assert.assertEquals(2, notifiedMaxPoolSize.get());
+                assertMaxTaskConcurrency(executor, 2);
+                Assert.assertEquals(2, notifiedMaxPoolSize.get());
 
-            assertMaxTaskConcurrency(executor, 1);
-            Assert.assertEquals(1, notifiedMaxPoolSize.get());
+                assertMaxTaskConcurrency(executor, 1);
+                Assert.assertEquals(1, notifiedMaxPoolSize.get());
 
-            assertMaxTaskConcurrency(executor, 3);
-            Assert.assertEquals(3, notifiedMaxPoolSize.get());
+                assertMaxTaskConcurrency(executor, 3);
+                Assert.assertEquals(3, notifiedMaxPoolSize.get());
 
-            executor.setMaximumPoolSize(0);
-            Assert.assertEquals(0, notifiedMaxPoolSize.get());
+                executor.setMaximumPoolSize(0);
+                Assert.assertEquals(0, notifiedMaxPoolSize.get());
 
-            assertMaxTaskConcurrency(executor, 4);
-            Assert.assertEquals(4, notifiedMaxPoolSize.get());
+                assertMaxTaskConcurrency(executor, 4);
+                Assert.assertEquals(4, notifiedMaxPoolSize.get());
+            }
         }
         finally
         {
             stayBusy.set(false);
-            executor.setMaximumPoolSize(previousConcurrency);
-            executor.shutdownNow();
-            Assert.assertTrue(executor.isShutdown());
-            Assert.assertTrue(executor.awaitTermination(1L, TimeUnit.MINUTES));
+            makeBusy.join(TimeUnit.SECONDS.toMillis(5));
+            Assert.assertFalse("makeBusy thread should have checked stayBusy and exited",
+                               makeBusy.isAlive());
+            sharedPool.shutdownAndWait(1L, MINUTES);
         }
     }
 
@@ -149,21 +172,16 @@ public class SEPExecutorTest
 
     static class BusyWork implements Runnable
     {
-        private ExecutorService executor;
-        private AtomicBoolean stayBusy;
+        private Semaphore busyWorkers;
 
-        public BusyWork(ExecutorService executor, AtomicBoolean stayBusy)
+        public BusyWork(Semaphore busyWorkers)
         {
-            this.executor = executor;
-            this.stayBusy = stayBusy;
+            this.busyWorkers = busyWorkers;
         }
 
         public void run()
         {
-            if (stayBusy.get())
-            {
-                executor.execute(new BusyWork(executor, stayBusy));
-            }
+            busyWorkers.release();
         }
     }
 
@@ -177,6 +195,7 @@ public class SEPExecutorTest
             executor.execute(new LatchWaiter(concurrencyGoal, 5L, TimeUnit.SECONDS));
         }
         // Will return true if all of the LatchWaiters count down before the timeout
-        Assert.assertEquals(true, concurrencyGoal.await(3L, TimeUnit.SECONDS));
+        Assert.assertEquals("Test tasks did not hit max concurrency goal",
+                            true, concurrencyGoal.await(3L, TimeUnit.SECONDS));
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org